From bb6e8ba6829eab24444d6fe6950160d05be7d96c Mon Sep 17 00:00:00 2001 From: Yossi Gottlieb Date: Fri, 23 Feb 2018 16:19:37 +0200 Subject: [PATCH 01/17] Initial command filter experiment. --- src/module.c | 76 +++++++++++++++++++++++++++++++++++++++ src/modules/Makefile | 6 +++- src/modules/hellofilter.c | 69 +++++++++++++++++++++++++++++++++++ src/redismodule.h | 8 +++++ src/server.c | 2 ++ src/server.h | 2 +- 6 files changed, 161 insertions(+), 2 deletions(-) create mode 100644 src/modules/hellofilter.c diff --git a/src/module.c b/src/module.c index e69d3dc61..1780342ed 100644 --- a/src/module.c +++ b/src/module.c @@ -270,6 +270,28 @@ typedef struct RedisModuleDictIter { raxIterator ri; } RedisModuleDictIter; +/* Information about the command to be executed, as passed to and from a + * filter. */ +typedef struct RedisModuleFilteredCommand { + RedisModuleString **argv; + int argc; +} RedisModuleFilteredCommand; + +typedef void (*RedisModuleCommandFilterFunc) (RedisModuleCtx *ctx, RedisModuleFilteredCommand *cmd); + +typedef struct RedisModuleCommandFilter { + /* The module that registered the filter */ + RedisModule *module; + /* Filter callback function */ + RedisModuleCommandFilterFunc callback; + /* Indicates a filter is active, avoid reentrancy */ + int active; +} RedisModuleCommandFilter; + +/* Registered filters */ +static list *moduleCommandFilters; + + /* -------------------------------------------------------------------------- * Prototypes * -------------------------------------------------------------------------- */ @@ -4770,6 +4792,56 @@ int moduleUnregisterUsedAPI(RedisModule *module) { return count; } +/* -------------------------------------------------------------------------- + * Module Command Filter API + * -------------------------------------------------------------------------- */ + +/* Register a new command filter function. Filters get executed by Redis + * before processing an inbound command and can be used to manipulate the + * behavior of standard Redis commands. Filters must not attempt to + * perform Redis commands or operate on the dataset, and must restrict + * themselves to manipulation of the arguments. + */ + +int RM_RegisterCommandFilter(RedisModuleCtx *ctx, RedisModuleCommandFilterFunc callback) { + RedisModuleCommandFilter *filter = zmalloc(sizeof(*filter)); + filter->module = ctx->module; + filter->callback = callback; + filter->active = 0; + + listAddNodeTail(moduleCommandFilters, filter); + return REDISMODULE_OK; +} + +void moduleCallCommandFilters(client *c) { + if (listLength(moduleCommandFilters) == 0) return; + + listIter li; + listNode *ln; + listRewind(moduleCommandFilters,&li); + + RedisModuleFilteredCommand cmd = { + .argv = c->argv, + .argc = c->argc + }; + + while((ln = listNext(&li))) { + RedisModuleCommandFilter *filter = ln->value; + if (filter->active) continue; + + RedisModuleCtx ctx = REDISMODULE_CTX_INIT; + ctx.module = filter->module; + + filter->active = 1; + filter->callback(&ctx, &cmd); + filter->active = 0; + moduleFreeContext(&ctx); + } + + c->argv = cmd.argv; + c->argc = cmd.argc; +} + /* -------------------------------------------------------------------------- * Modules API internals * -------------------------------------------------------------------------- */ @@ -4816,6 +4888,9 @@ void moduleInitModulesSystem(void) { moduleFreeContextReusedClient->flags |= CLIENT_MODULE; moduleFreeContextReusedClient->user = NULL; /* root user. */ + /* Set up filter list */ + moduleCommandFilters = listCreate(); + moduleRegisterCoreAPI(); if (pipe(server.module_blocked_pipe) == -1) { serverLog(LL_WARNING, @@ -5219,4 +5294,5 @@ void moduleRegisterCoreAPI(void) { REGISTER_API(DictCompare); REGISTER_API(ExportSharedAPI); REGISTER_API(GetSharedAPI); + REGISTER_API(RegisterCommandFilter); } diff --git a/src/modules/Makefile b/src/modules/Makefile index 51ffac17d..537aa0daf 100644 --- a/src/modules/Makefile +++ b/src/modules/Makefile @@ -13,7 +13,7 @@ endif .SUFFIXES: .c .so .xo .o -all: helloworld.so hellotype.so helloblock.so testmodule.so hellocluster.so hellotimer.so hellodict.so +all: helloworld.so hellotype.so helloblock.so testmodule.so hellocluster.so hellotimer.so hellodict.so hellofilter.so .c.xo: $(CC) -I. $(CFLAGS) $(SHOBJ_CFLAGS) -fPIC -c $< -o $@ @@ -46,6 +46,10 @@ hellotimer.so: hellotimer.xo hellodict.xo: ../redismodule.h hellodict.so: hellodict.xo + +hellofilter.xo: ../redismodule.h + +hellofilter.so: hellofilter.xo $(LD) -o $@ $< $(SHOBJ_LDFLAGS) $(LIBS) -lc testmodule.xo: ../redismodule.h diff --git a/src/modules/hellofilter.c b/src/modules/hellofilter.c new file mode 100644 index 000000000..c9e33158f --- /dev/null +++ b/src/modules/hellofilter.c @@ -0,0 +1,69 @@ +#define REDISMODULE_EXPERIMENTAL_API +#include "../redismodule.h" + +static RedisModuleString *log_key_name; + +static const char log_command_name[] = "hellofilter.log"; + +int HelloFilter_LogCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) +{ + RedisModuleString *s = RedisModule_CreateStringFromString(ctx, argv[0]); + + int i; + for (i = 1; i < argc; i++) { + size_t arglen; + const char *arg = RedisModule_StringPtrLen(argv[i], &arglen); + + RedisModule_StringAppendBuffer(ctx, s, " ", 1); + RedisModule_StringAppendBuffer(ctx, s, arg, arglen); + } + + RedisModuleKey *log = RedisModule_OpenKey(ctx, log_key_name, REDISMODULE_WRITE|REDISMODULE_READ); + RedisModule_ListPush(log, REDISMODULE_LIST_HEAD, s); + RedisModule_CloseKey(log); + RedisModule_FreeString(ctx, s); + + size_t cmdlen; + const char *cmdname = RedisModule_StringPtrLen(argv[1], &cmdlen); + RedisModuleCallReply *reply = RedisModule_Call(ctx, cmdname, "v", &argv[2], argc - 2); + if (reply) { + RedisModule_ReplyWithCallReply(ctx, reply); + RedisModule_FreeCallReply(reply); + } else { + RedisModule_ReplyWithSimpleString(ctx, "Unknown command or invalid arguments"); + } + return REDISMODULE_OK; +} + +void HelloFilter_CommandFilter(RedisModuleCtx *ctx, RedisModuleFilteredCommand *cmd) +{ + cmd->argv = RedisModule_Realloc(cmd->argv, (cmd->argc+1)*sizeof(RedisModuleString *)); + int i; + + for (i = cmd->argc; i > 0; i--) { + cmd->argv[i] = cmd->argv[i-1]; + } + cmd->argv[0] = RedisModule_CreateString(ctx, log_command_name, sizeof(log_command_name)-1); + cmd->argc++; +} + +int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + if (RedisModule_Init(ctx,"hellofilter",1,REDISMODULE_APIVER_1) + == REDISMODULE_ERR) return REDISMODULE_ERR; + + if (argc != 1) { + RedisModule_Log(ctx, "warning", "Log key name not specified"); + return REDISMODULE_ERR; + } + + log_key_name = RedisModule_CreateStringFromString(ctx, argv[0]); + + if (RedisModule_CreateCommand(ctx,log_command_name, + HelloFilter_LogCommand,"write deny-oom",1,1,1) == REDISMODULE_ERR) + return REDISMODULE_ERR; + + if (RedisModule_RegisterCommandFilter(ctx, HelloFilter_CommandFilter) + == REDISMODULE_ERR) return REDISMODULE_ERR; + + return REDISMODULE_OK; +} diff --git a/src/redismodule.h b/src/redismodule.h index 272da08df..54ce99d96 100644 --- a/src/redismodule.h +++ b/src/redismodule.h @@ -163,6 +163,12 @@ typedef void (*RedisModuleTypeFreeFunc)(void *value); typedef void (*RedisModuleClusterMessageReceiver)(RedisModuleCtx *ctx, const char *sender_id, uint8_t type, const unsigned char *payload, uint32_t len); typedef void (*RedisModuleTimerProc)(RedisModuleCtx *ctx, void *data); +typedef struct RedisModuleFilteredCommand { + RedisModuleString **argv; + int argc; +} RedisModuleFilteredCommand; +typedef void (*RedisModuleCommandFilterFunc) (RedisModuleCtx *ctx, RedisModuleFilteredCommand *cmd); + #define REDISMODULE_TYPE_METHOD_VERSION 1 typedef struct RedisModuleTypeMethods { uint64_t version; @@ -337,6 +343,7 @@ void REDISMODULE_API_FUNC(RedisModule_SetDisconnectCallback)(RedisModuleBlockedC void REDISMODULE_API_FUNC(RedisModule_SetClusterFlags)(RedisModuleCtx *ctx, uint64_t flags); int REDISMODULE_API_FUNC(RedisModule_ExportSharedAPI)(RedisModuleCtx *ctx, const char *apiname, void *func); void *REDISMODULE_API_FUNC(RedisModule_GetSharedAPI)(RedisModuleCtx *ctx, const char *apiname); +int REDISMODULE_API_FUNC(RedisModule_RegisterCommandFilter)(RedisModuleCtx *ctx, RedisModuleCommandFilterFunc cb); #endif /* This is included inline inside each Redis module. */ @@ -499,6 +506,7 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int REDISMODULE_GET_API(SetClusterFlags); REDISMODULE_GET_API(ExportSharedAPI); REDISMODULE_GET_API(GetSharedAPI); + REDISMODULE_GET_API(RegisterCommandFilter); #endif if (RedisModule_IsModuleNameBusy && RedisModule_IsModuleNameBusy(name)) return REDISMODULE_ERR; diff --git a/src/server.c b/src/server.c index 712cda1bd..66e79dea3 100644 --- a/src/server.c +++ b/src/server.c @@ -3268,6 +3268,8 @@ void call(client *c, int flags) { * other operations can be performed by the caller. Otherwise * if C_ERR is returned the client was destroyed (i.e. after QUIT). */ int processCommand(client *c) { + moduleCallCommandFilters(c); + /* The QUIT command is handled separately. Normal command procs will * go through checking for replication and QUIT will cause trouble * when FORCE_REPLICATION is enabled and would be implemented in diff --git a/src/server.h b/src/server.h index 56c3b67d3..f55213bfc 100644 --- a/src/server.h +++ b/src/server.h @@ -1489,7 +1489,7 @@ size_t moduleCount(void); void moduleAcquireGIL(void); void moduleReleaseGIL(void); void moduleNotifyKeyspaceEvent(int type, const char *event, robj *key, int dbid); - +void moduleCallCommandFilters(client *c); /* Utils */ long long ustime(void); From bc47c987d68ade4f481122cfd6cec639dbba91ad Mon Sep 17 00:00:00 2001 From: Yossi Gottlieb Date: Mon, 18 Mar 2019 18:36:46 +0200 Subject: [PATCH 02/17] Add command filtering argument handling API. --- src/module.c | 81 +++++++++++++++++++++++++++++++++++++++ src/modules/hellofilter.c | 46 ++++++++++++++++++---- src/redismodule.h | 18 ++++++--- 3 files changed, 132 insertions(+), 13 deletions(-) diff --git a/src/module.c b/src/module.c index 1780342ed..741c546b7 100644 --- a/src/module.c +++ b/src/module.c @@ -291,6 +291,10 @@ typedef struct RedisModuleCommandFilter { /* Registered filters */ static list *moduleCommandFilters; +typedef struct RedisModuleCommandFilterCtx { + RedisModuleString **argv; + int argc; +} RedisModuleCommandFilterCtx; /* -------------------------------------------------------------------------- * Prototypes @@ -4842,6 +4846,78 @@ void moduleCallCommandFilters(client *c) { c->argc = cmd.argc; } +/* Return the number of arguments a filtered command has. The number of + * arguments include the command itself. + */ +int RM_CommandFilterArgsCount(RedisModuleCommandFilterCtx *filter) +{ + return filter->argc; +} + +/* Return the specified command argument. The first argument (position 0) is + * the command itself, and the rest are user-provided args. + */ +const RedisModuleString *RM_CommandFilterArgGet(RedisModuleCommandFilterCtx *filter, int pos) +{ + if (pos < 0 || pos >= filter->argc) return NULL; + return filter->argv[pos]; +} + +/* Modify the filtered command by inserting a new argument at the specified + * position. The specified RedisModuleString argument may be used by Redis + * after the filter context is destroyed, so it must not be auto-memory + * allocated, freed or used elsewhere. + */ + +int RM_CommandFilterArgInsert(RedisModuleCommandFilterCtx *filter, int pos, RedisModuleString *arg) +{ + int i; + + if (pos < 0 || pos > filter->argc) return REDISMODULE_ERR; + + filter->argv = zrealloc(filter->argv, (filter->argc+1)*sizeof(RedisModuleString *)); + for (i = filter->argc; i > pos; i--) { + filter->argv[i] = filter->argv[i-1]; + } + filter->argv[pos] = arg; + filter->argc++; + + return REDISMODULE_OK; +} + +/* Modify the filtered command by replacing an existing argument with a new one. + * The specified RedisModuleString argument may be used by Redis after the + * filter context is destroyed, so it must not be auto-memory allocated, freed + * or used elsewhere. + */ + +int RM_CommandFilterArgReplace(RedisModuleCommandFilterCtx *filter, int pos, RedisModuleString *arg) +{ + if (pos < 0 || pos >= filter->argc) return REDISMODULE_ERR; + + decrRefCount(filter->argv[pos]); + filter->argv[pos] = arg; + + return REDISMODULE_OK; +} + +/* Modify the filtered command by deleting an argument at the specified + * position. + */ +int RM_CommandFilterArgDelete(RedisModuleCommandFilterCtx *filter, int pos) +{ + int i; + if (pos < 0 || pos >= filter->argc) return REDISMODULE_ERR; + + decrRefCount(filter->argv[pos]); + for (i = pos; i < filter->argc-1; i++) { + filter->argv[i] = filter->argv[i+1]; + } + filter->argc--; + + return REDISMODULE_OK; +} + /* -------------------------------------------------------------------------- * Modules API internals * -------------------------------------------------------------------------- */ @@ -5295,4 +5371,9 @@ void moduleRegisterCoreAPI(void) { REGISTER_API(ExportSharedAPI); REGISTER_API(GetSharedAPI); REGISTER_API(RegisterCommandFilter); + REGISTER_API(CommandFilterArgsCount); + REGISTER_API(CommandFilterArgGet); + REGISTER_API(CommandFilterArgInsert); + REGISTER_API(CommandFilterArgReplace); + REGISTER_API(CommandFilterArgDelete); } diff --git a/src/modules/hellofilter.c b/src/modules/hellofilter.c index c9e33158f..84eb02c30 100644 --- a/src/modules/hellofilter.c +++ b/src/modules/hellofilter.c @@ -1,6 +1,8 @@ #define REDISMODULE_EXPERIMENTAL_API #include "../redismodule.h" +#include + static RedisModuleString *log_key_name; static const char log_command_name[] = "hellofilter.log"; @@ -35,16 +37,46 @@ int HelloFilter_LogCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int ar return REDISMODULE_OK; } -void HelloFilter_CommandFilter(RedisModuleCtx *ctx, RedisModuleFilteredCommand *cmd) +void HelloFilter_CommandFilter(RedisModuleCtx *ctx, RedisModuleCommandFilterCtx *filter) { - cmd->argv = RedisModule_Realloc(cmd->argv, (cmd->argc+1)*sizeof(RedisModuleString *)); - int i; + (void) ctx; - for (i = cmd->argc; i > 0; i--) { - cmd->argv[i] = cmd->argv[i-1]; + /* Fun manipulations: + * - Remove @delme + * - Replace @replaceme + * - Append @insertbefore or @insertafter + * - Prefix with Log command if @log encounterd + */ + int log = 0; + int pos = 0; + while (pos < RedisModule_CommandFilterArgsCount(filter)) { + const RedisModuleString *arg = RedisModule_CommandFilterArgGet(filter, pos); + size_t arg_len; + const char *arg_str = RedisModule_StringPtrLen(arg, &arg_len); + + if (arg_len == 6 && !memcmp(arg_str, "@delme", 6)) { + RedisModule_CommandFilterArgDelete(filter, pos); + continue; + } + if (arg_len == 10 && !memcmp(arg_str, "@replaceme", 10)) { + RedisModule_CommandFilterArgReplace(filter, pos, + RedisModule_CreateString(NULL, "--replaced--", 12)); + } else if (arg_len == 13 && !memcmp(arg_str, "@insertbefore", 13)) { + RedisModule_CommandFilterArgInsert(filter, pos, + RedisModule_CreateString(NULL, "--inserted-before--", 19)); + pos++; + } else if (arg_len == 12 && !memcmp(arg_str, "@insertafter", 12)) { + RedisModule_CommandFilterArgInsert(filter, pos + 1, + RedisModule_CreateString(NULL, "--inserted-after--", 18)); + pos++; + } else if (arg_len == 4 && !memcmp(arg_str, "@log", 4)) { + log = 1; + } + pos++; } - cmd->argv[0] = RedisModule_CreateString(ctx, log_command_name, sizeof(log_command_name)-1); - cmd->argc++; + + if (log) RedisModule_CommandFilterArgInsert(filter, 0, + RedisModule_CreateString(NULL, log_command_name, sizeof(log_command_name)-1)); } int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { diff --git a/src/redismodule.h b/src/redismodule.h index 54ce99d96..426a6df69 100644 --- a/src/redismodule.h +++ b/src/redismodule.h @@ -150,6 +150,7 @@ typedef struct RedisModuleBlockedClient RedisModuleBlockedClient; typedef struct RedisModuleClusterInfo RedisModuleClusterInfo; typedef struct RedisModuleDict RedisModuleDict; typedef struct RedisModuleDictIter RedisModuleDictIter; +typedef struct RedisModuleCommandFilterCtx RedisModuleCommandFilterCtx; typedef int (*RedisModuleCmdFunc)(RedisModuleCtx *ctx, RedisModuleString **argv, int argc); typedef void (*RedisModuleDisconnectFunc)(RedisModuleCtx *ctx, RedisModuleBlockedClient *bc); @@ -162,12 +163,7 @@ typedef void (*RedisModuleTypeDigestFunc)(RedisModuleDigest *digest, void *value typedef void (*RedisModuleTypeFreeFunc)(void *value); typedef void (*RedisModuleClusterMessageReceiver)(RedisModuleCtx *ctx, const char *sender_id, uint8_t type, const unsigned char *payload, uint32_t len); typedef void (*RedisModuleTimerProc)(RedisModuleCtx *ctx, void *data); - -typedef struct RedisModuleFilteredCommand { - RedisModuleString **argv; - int argc; -} RedisModuleFilteredCommand; -typedef void (*RedisModuleCommandFilterFunc) (RedisModuleCtx *ctx, RedisModuleFilteredCommand *cmd); +typedef void (*RedisModuleCommandFilterFunc) (RedisModuleCtx *ctx, RedisModuleCommandFilterCtx *filter); #define REDISMODULE_TYPE_METHOD_VERSION 1 typedef struct RedisModuleTypeMethods { @@ -344,6 +340,11 @@ void REDISMODULE_API_FUNC(RedisModule_SetClusterFlags)(RedisModuleCtx *ctx, uint int REDISMODULE_API_FUNC(RedisModule_ExportSharedAPI)(RedisModuleCtx *ctx, const char *apiname, void *func); void *REDISMODULE_API_FUNC(RedisModule_GetSharedAPI)(RedisModuleCtx *ctx, const char *apiname); int REDISMODULE_API_FUNC(RedisModule_RegisterCommandFilter)(RedisModuleCtx *ctx, RedisModuleCommandFilterFunc cb); +int REDISMODULE_API_FUNC(RedisModule_CommandFilterArgsCount)(RedisModuleCommandFilterCtx *filter); +const RedisModuleString *REDISMODULE_API_FUNC(RedisModule_CommandFilterArgGet)(RedisModuleCommandFilterCtx *filter, int pos); +int REDISMODULE_API_FUNC(RedisModule_CommandFilterArgInsert)(RedisModuleCommandFilterCtx *filter, int pos, RedisModuleString *arg); +int REDISMODULE_API_FUNC(RedisModule_CommandFilterArgReplace)(RedisModuleCommandFilterCtx *filter, int pos, RedisModuleString *arg); +int REDISMODULE_API_FUNC(RedisModule_CommandFilterArgDelete)(RedisModuleCommandFilterCtx *filter, int pos); #endif /* This is included inline inside each Redis module. */ @@ -507,6 +508,11 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int REDISMODULE_GET_API(ExportSharedAPI); REDISMODULE_GET_API(GetSharedAPI); REDISMODULE_GET_API(RegisterCommandFilter); + REDISMODULE_GET_API(CommandFilterArgsCount); + REDISMODULE_GET_API(CommandFilterArgGet); + REDISMODULE_GET_API(CommandFilterArgInsert); + REDISMODULE_GET_API(CommandFilterArgReplace); + REDISMODULE_GET_API(CommandFilterArgDelete); #endif if (RedisModule_IsModuleNameBusy && RedisModule_IsModuleNameBusy(name)) return REDISMODULE_ERR; From 95881cec60b54ab00b68db306a9b0078aefd6036 Mon Sep 17 00:00:00 2001 From: Yossi Gottlieb Date: Mon, 18 Mar 2019 19:34:52 +0200 Subject: [PATCH 03/17] Add command filter Module API tests. --- tests/modules/commandfilter.tcl | 27 +++++++++++++++++++++++++++ tests/test_helper.tcl | 1 + 2 files changed, 28 insertions(+) create mode 100644 tests/modules/commandfilter.tcl diff --git a/tests/modules/commandfilter.tcl b/tests/modules/commandfilter.tcl new file mode 100644 index 000000000..f0d96b259 --- /dev/null +++ b/tests/modules/commandfilter.tcl @@ -0,0 +1,27 @@ +set testmodule [file normalize src/modules/hellofilter.so] + +start_server {tags {"modules"}} { + r module load $testmodule log-key + + test {Command Filter handles redirected commands} { + r set mykey @log + r lrange log-key 0 -1 + } "{hellofilter.log set mykey @log}" + + test {Command Filter can call RedisModule_CommandFilterArgDelete} { + r rpush mylist elem1 @delme elem2 + r lrange mylist 0 -1 + } {elem1 elem2} + + test {Command Filter can call RedisModule_CommandFilterArgInsert} { + r del mylist + r rpush mylist elem1 @insertbefore elem2 @insertafter elem3 + r lrange mylist 0 -1 + } {elem1 --inserted-before-- @insertbefore elem2 @insertafter --inserted-after-- elem3} + + test {Command Filter can call RedisModule_CommandFilterArgReplace} { + r del mylist + r rpush mylist elem1 @replaceme elem2 + r lrange mylist 0 -1 + } {elem1 --replaced-- elem2} +} diff --git a/tests/test_helper.tcl b/tests/test_helper.tcl index 568eacdee..d2f281526 100644 --- a/tests/test_helper.tcl +++ b/tests/test_helper.tcl @@ -63,6 +63,7 @@ set ::all_tests { unit/lazyfree unit/wait unit/pendingquerybuf + modules/commandfilter } # Index to the next test to run in the ::all_tests list. set ::next_test 0 From fdacd1b0b5a3a9ad1bd3ba5c1e93c51fa31b74a9 Mon Sep 17 00:00:00 2001 From: Yossi Gottlieb Date: Mon, 18 Mar 2019 23:05:52 +0200 Subject: [PATCH 04/17] CommandFilter API: More cleanup. --- src/module.c | 37 +++++++++---------------------------- src/redismodule.h | 2 +- 2 files changed, 10 insertions(+), 29 deletions(-) diff --git a/src/module.c b/src/module.c index 741c546b7..c6cb8a0ca 100644 --- a/src/module.c +++ b/src/module.c @@ -270,32 +270,23 @@ typedef struct RedisModuleDictIter { raxIterator ri; } RedisModuleDictIter; -/* Information about the command to be executed, as passed to and from a - * filter. */ -typedef struct RedisModuleFilteredCommand { +typedef struct RedisModuleCommandFilterCtx { RedisModuleString **argv; int argc; -} RedisModuleFilteredCommand; +} RedisModuleCommandFilterCtx; -typedef void (*RedisModuleCommandFilterFunc) (RedisModuleCtx *ctx, RedisModuleFilteredCommand *cmd); +typedef void (*RedisModuleCommandFilterFunc) (RedisModuleCommandFilterCtx *filter); typedef struct RedisModuleCommandFilter { /* The module that registered the filter */ RedisModule *module; /* Filter callback function */ RedisModuleCommandFilterFunc callback; - /* Indicates a filter is active, avoid reentrancy */ - int active; } RedisModuleCommandFilter; /* Registered filters */ static list *moduleCommandFilters; -typedef struct RedisModuleCommandFilterCtx { - RedisModuleString **argv; - int argc; -} RedisModuleCommandFilterCtx; - /* -------------------------------------------------------------------------- * Prototypes * -------------------------------------------------------------------------- */ @@ -4802,16 +4793,13 @@ int moduleUnregisterUsedAPI(RedisModule *module) { /* Register a new command filter function. Filters get executed by Redis * before processing an inbound command and can be used to manipulate the - * behavior of standard Redis commands. Filters must not attempt to - * perform Redis commands or operate on the dataset, and must restrict - * themselves to manipulation of the arguments. + * behavior of standard Redis commands. */ int RM_RegisterCommandFilter(RedisModuleCtx *ctx, RedisModuleCommandFilterFunc callback) { RedisModuleCommandFilter *filter = zmalloc(sizeof(*filter)); filter->module = ctx->module; filter->callback = callback; - filter->active = 0; listAddNodeTail(moduleCommandFilters, filter); return REDISMODULE_OK; @@ -4824,26 +4812,19 @@ void moduleCallCommandFilters(client *c) { listNode *ln; listRewind(moduleCommandFilters,&li); - RedisModuleFilteredCommand cmd = { + RedisModuleCommandFilterCtx filter = { .argv = c->argv, .argc = c->argc }; while((ln = listNext(&li))) { - RedisModuleCommandFilter *filter = ln->value; - if (filter->active) continue; + RedisModuleCommandFilter *f = ln->value; - RedisModuleCtx ctx = REDISMODULE_CTX_INIT; - ctx.module = filter->module; - - filter->active = 1; - filter->callback(&ctx, &cmd); - filter->active = 0; - moduleFreeContext(&ctx); + f->callback(&filter); } - c->argv = cmd.argv; - c->argc = cmd.argc; + c->argv = filter.argv; + c->argc = filter.argc; } /* Return the number of arguments a filtered command has. The number of diff --git a/src/redismodule.h b/src/redismodule.h index 426a6df69..5df83ae6a 100644 --- a/src/redismodule.h +++ b/src/redismodule.h @@ -163,7 +163,7 @@ typedef void (*RedisModuleTypeDigestFunc)(RedisModuleDigest *digest, void *value typedef void (*RedisModuleTypeFreeFunc)(void *value); typedef void (*RedisModuleClusterMessageReceiver)(RedisModuleCtx *ctx, const char *sender_id, uint8_t type, const unsigned char *payload, uint32_t len); typedef void (*RedisModuleTimerProc)(RedisModuleCtx *ctx, void *data); -typedef void (*RedisModuleCommandFilterFunc) (RedisModuleCtx *ctx, RedisModuleCommandFilterCtx *filter); +typedef void (*RedisModuleCommandFilterFunc) (RedisModuleCommandFilterCtx *filter); #define REDISMODULE_TYPE_METHOD_VERSION 1 typedef struct RedisModuleTypeMethods { From 5bd8aae664437cace61a422434978d8bb3740ed1 Mon Sep 17 00:00:00 2001 From: Yossi Gottlieb Date: Mon, 18 Mar 2019 23:06:38 +0200 Subject: [PATCH 05/17] CommandFilter API: Support Lua and RM_call() flows. --- src/module.c | 20 +++++++++++++------- src/scripting.c | 5 +++++ 2 files changed, 18 insertions(+), 7 deletions(-) diff --git a/src/module.c b/src/module.c index c6cb8a0ca..17accfb70 100644 --- a/src/module.c +++ b/src/module.c @@ -2741,12 +2741,6 @@ RedisModuleCallReply *RM_Call(RedisModuleCtx *ctx, const char *cmdname, const ch RedisModuleCallReply *reply = NULL; int replicate = 0; /* Replicate this command? */ - cmd = lookupCommandByCString((char*)cmdname); - if (!cmd) { - errno = EINVAL; - return NULL; - } - /* Create the client and dispatch the command. */ va_start(ap, fmt); c = createClient(-1); @@ -2760,11 +2754,23 @@ RedisModuleCallReply *RM_Call(RedisModuleCtx *ctx, const char *cmdname, const ch c->db = ctx->client->db; c->argv = argv; c->argc = argc; - c->cmd = c->lastcmd = cmd; /* We handle the above format error only when the client is setup so that * we can free it normally. */ if (argv == NULL) goto cleanup; + /* Call command filters */ + moduleCallCommandFilters(c); + + /* Lookup command now, after filters had a chance to make modifications + * if necessary. + */ + cmd = lookupCommand(c->argv[0]->ptr); + if (!cmd) { + errno = EINVAL; + goto cleanup; + } + c->cmd = c->lastcmd = cmd; + /* Basic arity checks. */ if ((cmd->arity > 0 && cmd->arity != argc) || (argc < -cmd->arity)) { errno = EINVAL; diff --git a/src/scripting.c b/src/scripting.c index cbbf43fb1..032bfdf10 100644 --- a/src/scripting.c +++ b/src/scripting.c @@ -462,6 +462,11 @@ int luaRedisGenericCommand(lua_State *lua, int raise_error) { c->argc = argc; c->user = server.lua_caller->user; + /* Process module hooks */ + moduleCallCommandFilters(c); + argv = c->argv; + argc = c->argc; + /* Log the command if debugging is active. */ if (ldb.active && ldb.step) { sds cmdlog = sdsnew(""); From 06a6d70ab59af885c4cbfffdc023a1ad05bfa5df Mon Sep 17 00:00:00 2001 From: Yossi Gottlieb Date: Mon, 18 Mar 2019 23:07:28 +0200 Subject: [PATCH 06/17] CommandFilter API: hellofilter and tests. --- src/modules/hellofilter.c | 32 ++++++++++++++++++++++++++++---- tests/modules/commandfilter.tcl | 20 +++++++++++++++++++- 2 files changed, 47 insertions(+), 5 deletions(-) diff --git a/src/modules/hellofilter.c b/src/modules/hellofilter.c index 84eb02c30..d5dd405aa 100644 --- a/src/modules/hellofilter.c +++ b/src/modules/hellofilter.c @@ -6,17 +6,32 @@ static RedisModuleString *log_key_name; static const char log_command_name[] = "hellofilter.log"; +static const char ping_command_name[] = "hellofilter.ping"; +static int in_module = 0; + +int HelloFilter_PingCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) +{ + RedisModuleCallReply *reply = RedisModule_Call(ctx, "ping", "c", "@log"); + if (reply) { + RedisModule_ReplyWithCallReply(ctx, reply); + RedisModule_FreeCallReply(reply); + } else { + RedisModule_ReplyWithSimpleString(ctx, "Unknown command or invalid arguments"); + } + + return REDISMODULE_OK; +} int HelloFilter_LogCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { - RedisModuleString *s = RedisModule_CreateStringFromString(ctx, argv[0]); + RedisModuleString *s = RedisModule_CreateString(ctx, "", 0); int i; for (i = 1; i < argc; i++) { size_t arglen; const char *arg = RedisModule_StringPtrLen(argv[i], &arglen); - RedisModule_StringAppendBuffer(ctx, s, " ", 1); + if (i > 1) RedisModule_StringAppendBuffer(ctx, s, " ", 1); RedisModule_StringAppendBuffer(ctx, s, arg, arglen); } @@ -25,6 +40,8 @@ int HelloFilter_LogCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int ar RedisModule_CloseKey(log); RedisModule_FreeString(ctx, s); + in_module = 1; + size_t cmdlen; const char *cmdname = RedisModule_StringPtrLen(argv[1], &cmdlen); RedisModuleCallReply *reply = RedisModule_Call(ctx, cmdname, "v", &argv[2], argc - 2); @@ -34,12 +51,15 @@ int HelloFilter_LogCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int ar } else { RedisModule_ReplyWithSimpleString(ctx, "Unknown command or invalid arguments"); } + + in_module = 0; + return REDISMODULE_OK; } -void HelloFilter_CommandFilter(RedisModuleCtx *ctx, RedisModuleCommandFilterCtx *filter) +void HelloFilter_CommandFilter(RedisModuleCommandFilterCtx *filter) { - (void) ctx; + if (in_module) return; /* don't process our own RM_Call() */ /* Fun manipulations: * - Remove @delme @@ -94,6 +114,10 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) HelloFilter_LogCommand,"write deny-oom",1,1,1) == REDISMODULE_ERR) return REDISMODULE_ERR; + if (RedisModule_CreateCommand(ctx,ping_command_name, + HelloFilter_PingCommand,"write deny-oom",1,1,1) == REDISMODULE_ERR) + return REDISMODULE_ERR; + if (RedisModule_RegisterCommandFilter(ctx, HelloFilter_CommandFilter) == REDISMODULE_ERR) return REDISMODULE_ERR; diff --git a/tests/modules/commandfilter.tcl b/tests/modules/commandfilter.tcl index f0d96b259..47d9c302c 100644 --- a/tests/modules/commandfilter.tcl +++ b/tests/modules/commandfilter.tcl @@ -6,7 +6,7 @@ start_server {tags {"modules"}} { test {Command Filter handles redirected commands} { r set mykey @log r lrange log-key 0 -1 - } "{hellofilter.log set mykey @log}" + } "{set mykey @log}" test {Command Filter can call RedisModule_CommandFilterArgDelete} { r rpush mylist elem1 @delme elem2 @@ -24,4 +24,22 @@ start_server {tags {"modules"}} { r rpush mylist elem1 @replaceme elem2 r lrange mylist 0 -1 } {elem1 --replaced-- elem2} + + test {Command Filter applies on RM_Call() commands} { + r del log-key + r hellofilter.ping + r lrange log-key 0 -1 + } "{ping @log}" + + test {Command Filter applies on Lua redis.call()} { + r del log-key + r eval "redis.call('ping', '@log')" 0 + r lrange log-key 0 -1 + } "{ping @log}" + + test {Command Filter applies on Lua redis.call() that calls a module} { + r del log-key + r eval "redis.call('hellofilter.ping')" 0 + r lrange log-key 0 -1 + } "{ping @log}" } From 278c7a6b6d679dfda65a35c098e1fd763a974b86 Mon Sep 17 00:00:00 2001 From: Dvir Volk Date: Tue, 19 Mar 2019 13:11:37 +0200 Subject: [PATCH 07/17] Added keyspace miss notifications support --- src/db.c | 7 ++++++- src/modules/testmodule.c | 29 ++++++++++++++++++++++------- 2 files changed, 28 insertions(+), 8 deletions(-) diff --git a/src/db.c b/src/db.c index 7950d5074..8c9047084 100644 --- a/src/db.c +++ b/src/db.c @@ -83,6 +83,7 @@ robj *lookupKey(redisDb *db, robj *key, int flags) { * 1. A key gets expired if it reached it's TTL. * 2. The key last access time is updated. * 3. The global keys hits/misses stats are updated (reported in INFO). + * 4. If keyspace notifications are enabled, a "miss" notification is fired. * * This API should not be used when we write to the key after obtaining * the object linked to the key, but only for read only operations. @@ -106,6 +107,7 @@ robj *lookupKeyReadWithFlags(redisDb *db, robj *key, int flags) { * to return NULL ASAP. */ if (server.masterhost == NULL) { server.stat_keyspace_misses++; + notifyKeyspaceEvent(NOTIFY_GENERIC, "miss", key, db->id); return NULL; } @@ -127,12 +129,15 @@ robj *lookupKeyReadWithFlags(redisDb *db, robj *key, int flags) { server.current_client->cmd->flags & CMD_READONLY) { server.stat_keyspace_misses++; + notifyKeyspaceEvent(NOTIFY_GENERIC, "miss", key, db->id); return NULL; } } val = lookupKey(db,key,flags); - if (val == NULL) + if (val == NULL) { server.stat_keyspace_misses++; + notifyKeyspaceEvent(NOTIFY_GENERIC, "miss", key, db->id); + } else server.stat_keyspace_hits++; return val; diff --git a/src/modules/testmodule.c b/src/modules/testmodule.c index 67a861704..826dd9a7e 100644 --- a/src/modules/testmodule.c +++ b/src/modules/testmodule.c @@ -109,9 +109,9 @@ int TestStringPrintf(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { if (argc < 3) { return RedisModule_WrongArity(ctx); } - RedisModuleString *s = RedisModule_CreateStringPrintf(ctx, - "Got %d args. argv[1]: %s, argv[2]: %s", - argc, + RedisModuleString *s = RedisModule_CreateStringPrintf(ctx, + "Got %d args. argv[1]: %s, argv[2]: %s", + argc, RedisModule_StringPtrLen(argv[1], NULL), RedisModule_StringPtrLen(argv[2], NULL) ); @@ -133,7 +133,7 @@ int TestUnlink(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { RedisModuleKey *k = RedisModule_OpenKey(ctx, RedisModule_CreateStringPrintf(ctx, "unlinked"), REDISMODULE_WRITE | REDISMODULE_READ); if (!k) return failTest(ctx, "Could not create key"); - + if (REDISMODULE_ERR == RedisModule_StringSet(k, RedisModule_CreateStringPrintf(ctx, "Foobar"))) { return failTest(ctx, "Could not set string value"); } @@ -152,7 +152,7 @@ int TestUnlink(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { return failTest(ctx, "Could not verify key to be unlinked"); } return RedisModule_ReplyWithSimpleString(ctx, "OK"); - + } int NotifyCallback(RedisModuleCtx *ctx, int type, const char *event, @@ -188,6 +188,10 @@ int TestNotifications(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { RedisModule_Call(ctx, "LPUSH", "cc", "l", "y"); RedisModule_Call(ctx, "LPUSH", "cc", "l", "y"); + /* Miss some keys intentionally so we will get a "miss" notification. */ + RedisModule_Call(ctx, "GET", "c", "nosuchkey"); + RedisModule_Call(ctx, "SMEMBERS", "c", "nosuchkey"); + size_t sz; const char *rep; RedisModuleCallReply *r = RedisModule_Call(ctx, "HGET", "cc", "notifications", "foo"); @@ -225,6 +229,16 @@ int TestNotifications(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { FAIL("Wrong reply for l"); } + r = RedisModule_Call(ctx, "HGET", "cc", "notifications", "nosuchkey"); + if (r == NULL || RedisModule_CallReplyType(r) != REDISMODULE_REPLY_STRING) { + FAIL("Wrong or no reply for nosuchkey"); + } else { + rep = RedisModule_CallReplyStringPtr(r, &sz); + if (sz != 1 || *rep != '2') { + FAIL("Got reply '%.*s'. expected '2'", sz, rep); + } + } + RedisModule_Call(ctx, "FLUSHDB", ""); return RedisModule_ReplyWithSimpleString(ctx, "OK"); @@ -423,7 +437,7 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) if (RedisModule_CreateCommand(ctx,"test.ctxflags", TestCtxFlags,"readonly",1,1,1) == REDISMODULE_ERR) return REDISMODULE_ERR; - + if (RedisModule_CreateCommand(ctx,"test.unlink", TestUnlink,"write deny-oom",1,1,1) == REDISMODULE_ERR) return REDISMODULE_ERR; @@ -435,7 +449,8 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) RedisModule_SubscribeToKeyspaceEvents(ctx, REDISMODULE_NOTIFY_HASH | REDISMODULE_NOTIFY_SET | - REDISMODULE_NOTIFY_STRING, + REDISMODULE_NOTIFY_STRING | + REDISMODULE_NOTIFY_GENERIC, NotifyCallback); if (RedisModule_CreateCommand(ctx,"test.notify", TestNotifications,"write deny-oom",1,1,1) == REDISMODULE_ERR) From 1da0d9b04cded3f65ca7c6aecbee07c02d118696 Mon Sep 17 00:00:00 2001 From: Yossi Gottlieb Date: Tue, 19 Mar 2019 19:48:47 +0200 Subject: [PATCH 08/17] CommandFilter API: Extend documentation. --- src/module.c | 48 +++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 43 insertions(+), 5 deletions(-) diff --git a/src/module.c b/src/module.c index 17accfb70..ee2840225 100644 --- a/src/module.c +++ b/src/module.c @@ -4797,9 +4797,47 @@ int moduleUnregisterUsedAPI(RedisModule *module) { * Module Command Filter API * -------------------------------------------------------------------------- */ -/* Register a new command filter function. Filters get executed by Redis - * before processing an inbound command and can be used to manipulate the - * behavior of standard Redis commands. +/* Register a new command filter function. + * + * Command filtering makes it possible for modules to extend Redis by plugging + * into the execution flow of all commands. + * + * A registered filter gets called before Redis executes *any* command. This + * includes both core Redis commands and commands registered by any module. The + * filter applies in all execution paths including: + * + * 1. Invocation by a client. + * 2. Invocation through `RedisModule_Call()` by any module. + * 3. Invocation through Lua 'redis.call()`. + * 4. Replication of a command from a master. + * + * The filter executes in a special filter context, which is different and more + * limited than a RedisModuleCtx. Because the filter affects any command, it + * must be implemented in a very efficient way to reduce the performance impact + * on Redis. All Redis Module API calls that require a valid context (such as + * `RedisModule_Call()`, `RedisModule_OpenKey()`, etc.) are not supported in a + * filter context. + * + * The `RedisModuleCommandFilterCtx` can be used to inspect or modify the + * executed command and its arguments. As the filter executes before Redis + * begins processing the command, any change will affect the way the command is + * processed. For example, a module can override Redis commands this way: + * + * 1. Register a `MODULE.SET` command which implements an extended version of + * the Redis `SET` command. + * 2. Register a command filter which detects invocation of `SET` on a specific + * pattern of keys. Once detected, the filter will replace the first + * argument from `SET` to `MODULE.SET`. + * 3. When filter execution is complete, Redis considers the new command name + * and therefore executes the module's own command. + * + * Note that in the above use case, if `MODULE.SET` itself uses + * `RedisModule_Call()` the filter will be applied on that call as well. If + * that is not desired, the module itself is responsible for maintaining a flag + * to identify and avoid this form of re-entrancy. + * + * If multiple filters are registered (by the same or different modules), they + * are executed in the order of registration. */ int RM_RegisterCommandFilter(RedisModuleCtx *ctx, RedisModuleCommandFilterFunc callback) { @@ -4881,7 +4919,7 @@ int RM_CommandFilterArgInsert(RedisModuleCommandFilterCtx *filter, int pos, Redi int RM_CommandFilterArgReplace(RedisModuleCommandFilterCtx *filter, int pos, RedisModuleString *arg) { if (pos < 0 || pos >= filter->argc) return REDISMODULE_ERR; - + decrRefCount(filter->argv[pos]); filter->argv[pos] = arg; @@ -4901,7 +4939,7 @@ int RM_CommandFilterArgDelete(RedisModuleCommandFilterCtx *filter, int pos) filter->argv[i] = filter->argv[i+1]; } filter->argc--; - + return REDISMODULE_OK; } From 544b9b08265fbb1696ec4a27a4c09035cd099a94 Mon Sep 17 00:00:00 2001 From: Oran Agra Date: Wed, 20 Mar 2019 17:46:19 +0200 Subject: [PATCH 09/17] diskless replication - notify slave when rdb transfer failed in diskless replication - master was not notifing the slave that rdb transfer terminated on error, and lets slave wait for replication timeout --- src/replication.c | 1 + 1 file changed, 1 insertion(+) diff --git a/src/replication.c b/src/replication.c index f2adc7995..8f0d67914 100644 --- a/src/replication.c +++ b/src/replication.c @@ -593,6 +593,7 @@ int startBgsaveForReplication(int mincapa) { client *slave = ln->value; if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) { + slave->replstate = REPL_STATE_NONE; slave->flags &= ~CLIENT_SLAVE; listDelNode(server.slaves,ln); addReplyError(slave, From 50befc42ad1523942b82af81e6722b66744825f9 Mon Sep 17 00:00:00 2001 From: Dvir Volk Date: Thu, 21 Mar 2019 11:47:14 +0200 Subject: [PATCH 10/17] added special flag for keyspace miss notifications --- src/db.c | 6 +++--- src/modules/testmodule.c | 2 +- src/notify.c | 6 ++++-- src/redismodule.h | 1 + src/server.h | 4 +++- 5 files changed, 12 insertions(+), 7 deletions(-) diff --git a/src/db.c b/src/db.c index 8c9047084..afe181281 100644 --- a/src/db.c +++ b/src/db.c @@ -107,7 +107,7 @@ robj *lookupKeyReadWithFlags(redisDb *db, robj *key, int flags) { * to return NULL ASAP. */ if (server.masterhost == NULL) { server.stat_keyspace_misses++; - notifyKeyspaceEvent(NOTIFY_GENERIC, "miss", key, db->id); + notifyKeyspaceEvent(NOTIFY_KEY_MISS, "miss", key, db->id); return NULL; } @@ -129,14 +129,14 @@ robj *lookupKeyReadWithFlags(redisDb *db, robj *key, int flags) { server.current_client->cmd->flags & CMD_READONLY) { server.stat_keyspace_misses++; - notifyKeyspaceEvent(NOTIFY_GENERIC, "miss", key, db->id); + notifyKeyspaceEvent(NOTIFY_KEY_MISS, "miss", key, db->id); return NULL; } } val = lookupKey(db,key,flags); if (val == NULL) { server.stat_keyspace_misses++; - notifyKeyspaceEvent(NOTIFY_GENERIC, "miss", key, db->id); + notifyKeyspaceEvent(NOTIFY_KEY_MISS, "miss", key, db->id); } else server.stat_keyspace_hits++; diff --git a/src/modules/testmodule.c b/src/modules/testmodule.c index 826dd9a7e..af78d21d7 100644 --- a/src/modules/testmodule.c +++ b/src/modules/testmodule.c @@ -450,7 +450,7 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) REDISMODULE_NOTIFY_HASH | REDISMODULE_NOTIFY_SET | REDISMODULE_NOTIFY_STRING | - REDISMODULE_NOTIFY_GENERIC, + REDISMODULE_NOTIFY_KEY_MISS, NotifyCallback); if (RedisModule_CreateCommand(ctx,"test.notify", TestNotifications,"write deny-oom",1,1,1) == REDISMODULE_ERR) diff --git a/src/notify.c b/src/notify.c index 1afb36fc0..d6c3ad403 100644 --- a/src/notify.c +++ b/src/notify.c @@ -55,6 +55,7 @@ int keyspaceEventsStringToFlags(char *classes) { case 'K': flags |= NOTIFY_KEYSPACE; break; case 'E': flags |= NOTIFY_KEYEVENT; break; case 't': flags |= NOTIFY_STREAM; break; + case 'm': flags |= NOTIFY_KEY_MISS; break; default: return -1; } } @@ -81,6 +82,7 @@ sds keyspaceEventsFlagsToString(int flags) { if (flags & NOTIFY_EXPIRED) res = sdscatlen(res,"x",1); if (flags & NOTIFY_EVICTED) res = sdscatlen(res,"e",1); if (flags & NOTIFY_STREAM) res = sdscatlen(res,"t",1); + if (flags & NOTIFY_KEY_MISS) res = sdscatlen(res,"m",1); } if (flags & NOTIFY_KEYSPACE) res = sdscatlen(res,"K",1); if (flags & NOTIFY_KEYEVENT) res = sdscatlen(res,"E",1); @@ -100,12 +102,12 @@ void notifyKeyspaceEvent(int type, char *event, robj *key, int dbid) { int len = -1; char buf[24]; - /* If any modules are interested in events, notify the module system now. + /* If any modules are interested in events, notify the module system now. * This bypasses the notifications configuration, but the module engine * will only call event subscribers if the event type matches the types * they are interested in. */ moduleNotifyKeyspaceEvent(type, event, key, dbid); - + /* If notifications for this class of events are off, return ASAP. */ if (!(server.notify_keyspace_events & type)) return; diff --git a/src/redismodule.h b/src/redismodule.h index 272da08df..681bd600b 100644 --- a/src/redismodule.h +++ b/src/redismodule.h @@ -98,6 +98,7 @@ #define REDISMODULE_NOTIFY_EXPIRED (1<<8) /* x */ #define REDISMODULE_NOTIFY_EVICTED (1<<9) /* e */ #define REDISMODULE_NOTIFY_STREAM (1<<10) /* t */ +#define REDISMODULE_NOTIFY_KEY_MISS (1<<11) /* m */ #define REDISMODULE_NOTIFY_ALL (REDISMODULE_NOTIFY_GENERIC | REDISMODULE_NOTIFY_STRING | REDISMODULE_NOTIFY_LIST | REDISMODULE_NOTIFY_SET | REDISMODULE_NOTIFY_HASH | REDISMODULE_NOTIFY_ZSET | REDISMODULE_NOTIFY_EXPIRED | REDISMODULE_NOTIFY_EVICTED | REDISMODULE_NOTIFY_STREAM) /* A */ diff --git a/src/server.h b/src/server.h index 56c3b67d3..0b433039d 100644 --- a/src/server.h +++ b/src/server.h @@ -468,7 +468,9 @@ typedef long long mstime_t; /* millisecond time type. */ #define NOTIFY_EXPIRED (1<<8) /* x */ #define NOTIFY_EVICTED (1<<9) /* e */ #define NOTIFY_STREAM (1<<10) /* t */ -#define NOTIFY_ALL (NOTIFY_GENERIC | NOTIFY_STRING | NOTIFY_LIST | NOTIFY_SET | NOTIFY_HASH | NOTIFY_ZSET | NOTIFY_EXPIRED | NOTIFY_EVICTED | NOTIFY_STREAM) /* A flag */ +#define NOTIFY_KEY_MISS (1<<11) /* m */ + +#define NOTIFY_ALL (NOTIFY_GENERIC | NOTIFY_STRING | NOTIFY_LIST | NOTIFY_SET | NOTIFY_HASH | NOTIFY_ZSET | NOTIFY_EXPIRED | NOTIFY_EVICTED | NOTIFY_STREAM | NOTIFY_KEY_MISS) /* A flag */ /* Get the first bind addr or NULL */ #define NET_FIRST_BIND_ADDR (server.bindaddr_count ? server.bindaddr[0] : NULL) From ca2eadaaac824d329c877820cd5d159607e0227b Mon Sep 17 00:00:00 2001 From: Dvir Volk Date: Thu, 21 Mar 2019 12:47:51 +0200 Subject: [PATCH 11/17] Added missing REDISMODULE_NOTIFY_KEY_MISS flag to REDISMODULE_NOTIFY_ALL --- src/redismodule.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/redismodule.h b/src/redismodule.h index 681bd600b..70011f932 100644 --- a/src/redismodule.h +++ b/src/redismodule.h @@ -99,7 +99,7 @@ #define REDISMODULE_NOTIFY_EVICTED (1<<9) /* e */ #define REDISMODULE_NOTIFY_STREAM (1<<10) /* t */ #define REDISMODULE_NOTIFY_KEY_MISS (1<<11) /* m */ -#define REDISMODULE_NOTIFY_ALL (REDISMODULE_NOTIFY_GENERIC | REDISMODULE_NOTIFY_STRING | REDISMODULE_NOTIFY_LIST | REDISMODULE_NOTIFY_SET | REDISMODULE_NOTIFY_HASH | REDISMODULE_NOTIFY_ZSET | REDISMODULE_NOTIFY_EXPIRED | REDISMODULE_NOTIFY_EVICTED | REDISMODULE_NOTIFY_STREAM) /* A */ +#define REDISMODULE_NOTIFY_ALL (REDISMODULE_NOTIFY_GENERIC | REDISMODULE_NOTIFY_STRING | REDISMODULE_NOTIFY_LIST | REDISMODULE_NOTIFY_SET | REDISMODULE_NOTIFY_HASH | REDISMODULE_NOTIFY_ZSET | REDISMODULE_NOTIFY_EXPIRED | REDISMODULE_NOTIFY_EVICTED | REDISMODULE_NOTIFY_STREAM | REDISMODULE_NOTIFY_KEY_MISS) /* A */ /* A special pointer that we can use between the core and the module to signal From 51a54dfde3c27e162dfedcddb663350575cd7733 Mon Sep 17 00:00:00 2001 From: Dvir Volk Date: Thu, 21 Mar 2019 12:48:37 +0200 Subject: [PATCH 12/17] remove extra linebreak --- src/server.h | 1 - 1 file changed, 1 deletion(-) diff --git a/src/server.h b/src/server.h index 0b433039d..b090a6374 100644 --- a/src/server.h +++ b/src/server.h @@ -469,7 +469,6 @@ typedef long long mstime_t; /* millisecond time type. */ #define NOTIFY_EVICTED (1<<9) /* e */ #define NOTIFY_STREAM (1<<10) /* t */ #define NOTIFY_KEY_MISS (1<<11) /* m */ - #define NOTIFY_ALL (NOTIFY_GENERIC | NOTIFY_STRING | NOTIFY_LIST | NOTIFY_SET | NOTIFY_HASH | NOTIFY_ZSET | NOTIFY_EXPIRED | NOTIFY_EVICTED | NOTIFY_STREAM | NOTIFY_KEY_MISS) /* A flag */ /* Get the first bind addr or NULL */ From c675d44488094bc6e360f8a330375d3b9fb3b335 Mon Sep 17 00:00:00 2001 From: Yossi Gottlieb Date: Thu, 21 Mar 2019 14:44:49 +0200 Subject: [PATCH 13/17] CommandFilter API: Add unregister option. A filter handle is returned and can be used to unregister a filter. In the future it can also be used to further configure or manipulate the filter. Filters are now automatically unregistered when a module unloads. --- src/module.c | 94 +++++++++++++++++++++++++-------- src/modules/hellofilter.c | 27 ++++++++-- src/redismodule.h | 15 +++--- tests/modules/commandfilter.tcl | 22 ++++++++ 4 files changed, 126 insertions(+), 32 deletions(-) diff --git a/src/module.c b/src/module.c index ee2840225..ad7bba2eb 100644 --- a/src/module.c +++ b/src/module.c @@ -49,6 +49,7 @@ struct RedisModule { list *types; /* Module data types. */ list *usedby; /* List of modules using APIs from this one. */ list *using; /* List of modules we use some APIs of. */ + list *filters; /* List of filters the module has registered. */ }; typedef struct RedisModule RedisModule; @@ -748,6 +749,7 @@ void RM_SetModuleAttribs(RedisModuleCtx *ctx, const char *name, int ver, int api module->types = listCreate(); module->usedby = listCreate(); module->using = listCreate(); + module->filters = listCreate(); ctx->module = module; } @@ -4793,6 +4795,28 @@ int moduleUnregisterUsedAPI(RedisModule *module) { return count; } +/* Unregister all filters registered by a module. + * This is called when a module is being unloaded. + * + * Returns the number of filters unregistered. */ +int moduleUnregisterFilters(RedisModule *module) { + listIter li; + listNode *ln; + int count = 0; + + listRewind(module->filters,&li); + while((ln = listNext(&li))) { + RedisModuleCommandFilter *filter = ln->value; + listNode *ln = listSearchKey(moduleCommandFilters,filter); + if (ln) { + listDelNode(moduleCommandFilters,ln); + count++; + } + zfree(filter); + } + return count; +} + /* -------------------------------------------------------------------------- * Module Command Filter API * -------------------------------------------------------------------------- */ @@ -4840,12 +4864,33 @@ int moduleUnregisterUsedAPI(RedisModule *module) { * are executed in the order of registration. */ -int RM_RegisterCommandFilter(RedisModuleCtx *ctx, RedisModuleCommandFilterFunc callback) { +RedisModuleCommandFilter *RM_RegisterCommandFilter(RedisModuleCtx *ctx, RedisModuleCommandFilterFunc callback) { RedisModuleCommandFilter *filter = zmalloc(sizeof(*filter)); filter->module = ctx->module; filter->callback = callback; listAddNodeTail(moduleCommandFilters, filter); + listAddNodeTail(ctx->module->filters, filter); + return filter; +} + +/* Unregister a command filter. + */ +int RM_UnregisterCommandFilter(RedisModuleCtx *ctx, RedisModuleCommandFilter *filter) { + listNode *ln; + + /* A module can only remove its own filters */ + if (filter->module != ctx->module) return REDISMODULE_ERR; + + ln = listSearchKey(moduleCommandFilters,filter); + if (!ln) return REDISMODULE_ERR; + listDelNode(moduleCommandFilters,ln); + + ln = listSearchKey(ctx->module->filters,filter); + if (ln) { + listDelNode(moduleCommandFilters,ln); + } + return REDISMODULE_OK; } @@ -4874,18 +4919,18 @@ void moduleCallCommandFilters(client *c) { /* Return the number of arguments a filtered command has. The number of * arguments include the command itself. */ -int RM_CommandFilterArgsCount(RedisModuleCommandFilterCtx *filter) +int RM_CommandFilterArgsCount(RedisModuleCommandFilterCtx *fctx) { - return filter->argc; + return fctx->argc; } /* Return the specified command argument. The first argument (position 0) is * the command itself, and the rest are user-provided args. */ -const RedisModuleString *RM_CommandFilterArgGet(RedisModuleCommandFilterCtx *filter, int pos) +const RedisModuleString *RM_CommandFilterArgGet(RedisModuleCommandFilterCtx *fctx, int pos) { - if (pos < 0 || pos >= filter->argc) return NULL; - return filter->argv[pos]; + if (pos < 0 || pos >= fctx->argc) return NULL; + return fctx->argv[pos]; } /* Modify the filtered command by inserting a new argument at the specified @@ -4894,18 +4939,18 @@ const RedisModuleString *RM_CommandFilterArgGet(RedisModuleCommandFilterCtx *fil * allocated, freed or used elsewhere. */ -int RM_CommandFilterArgInsert(RedisModuleCommandFilterCtx *filter, int pos, RedisModuleString *arg) +int RM_CommandFilterArgInsert(RedisModuleCommandFilterCtx *fctx, int pos, RedisModuleString *arg) { int i; - if (pos < 0 || pos > filter->argc) return REDISMODULE_ERR; + if (pos < 0 || pos > fctx->argc) return REDISMODULE_ERR; - filter->argv = zrealloc(filter->argv, (filter->argc+1)*sizeof(RedisModuleString *)); - for (i = filter->argc; i > pos; i--) { - filter->argv[i] = filter->argv[i-1]; + fctx->argv = zrealloc(fctx->argv, (fctx->argc+1)*sizeof(RedisModuleString *)); + for (i = fctx->argc; i > pos; i--) { + fctx->argv[i] = fctx->argv[i-1]; } - filter->argv[pos] = arg; - filter->argc++; + fctx->argv[pos] = arg; + fctx->argc++; return REDISMODULE_OK; } @@ -4916,12 +4961,12 @@ int RM_CommandFilterArgInsert(RedisModuleCommandFilterCtx *filter, int pos, Redi * or used elsewhere. */ -int RM_CommandFilterArgReplace(RedisModuleCommandFilterCtx *filter, int pos, RedisModuleString *arg) +int RM_CommandFilterArgReplace(RedisModuleCommandFilterCtx *fctx, int pos, RedisModuleString *arg) { - if (pos < 0 || pos >= filter->argc) return REDISMODULE_ERR; + if (pos < 0 || pos >= fctx->argc) return REDISMODULE_ERR; - decrRefCount(filter->argv[pos]); - filter->argv[pos] = arg; + decrRefCount(fctx->argv[pos]); + fctx->argv[pos] = arg; return REDISMODULE_OK; } @@ -4929,16 +4974,16 @@ int RM_CommandFilterArgReplace(RedisModuleCommandFilterCtx *filter, int pos, Red /* Modify the filtered command by deleting an argument at the specified * position. */ -int RM_CommandFilterArgDelete(RedisModuleCommandFilterCtx *filter, int pos) +int RM_CommandFilterArgDelete(RedisModuleCommandFilterCtx *fctx, int pos) { int i; - if (pos < 0 || pos >= filter->argc) return REDISMODULE_ERR; + if (pos < 0 || pos >= fctx->argc) return REDISMODULE_ERR; - decrRefCount(filter->argv[pos]); - for (i = pos; i < filter->argc-1; i++) { - filter->argv[i] = filter->argv[i+1]; + decrRefCount(fctx->argv[pos]); + for (i = pos; i < fctx->argc-1; i++) { + fctx->argv[i] = fctx->argv[i+1]; } - filter->argc--; + fctx->argc--; return REDISMODULE_OK; } @@ -5041,6 +5086,7 @@ void moduleLoadFromQueue(void) { void moduleFreeModuleStructure(struct RedisModule *module) { listRelease(module->types); + listRelease(module->filters); sdsfree(module->name); zfree(module); } @@ -5132,6 +5178,7 @@ int moduleUnload(sds name) { moduleUnregisterCommands(module); moduleUnregisterSharedAPI(module); moduleUnregisterUsedAPI(module); + moduleUnregisterFilters(module); /* Remove any notification subscribers this module might have */ moduleUnsubscribeNotifications(module); @@ -5396,6 +5443,7 @@ void moduleRegisterCoreAPI(void) { REGISTER_API(ExportSharedAPI); REGISTER_API(GetSharedAPI); REGISTER_API(RegisterCommandFilter); + REGISTER_API(UnregisterCommandFilter); REGISTER_API(CommandFilterArgsCount); REGISTER_API(CommandFilterArgGet); REGISTER_API(CommandFilterArgInsert); diff --git a/src/modules/hellofilter.c b/src/modules/hellofilter.c index d5dd405aa..9cd440df2 100644 --- a/src/modules/hellofilter.c +++ b/src/modules/hellofilter.c @@ -7,10 +7,27 @@ static RedisModuleString *log_key_name; static const char log_command_name[] = "hellofilter.log"; static const char ping_command_name[] = "hellofilter.ping"; +static const char unregister_command_name[] = "hellofilter.unregister"; static int in_module = 0; +static RedisModuleCommandFilter *filter = NULL; + +int HelloFilter_UnregisterCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) +{ + (void) argc; + (void) argv; + + RedisModule_ReplyWithLongLong(ctx, + RedisModule_UnregisterCommandFilter(ctx, filter)); + + return REDISMODULE_OK; +} + int HelloFilter_PingCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + (void) argc; + (void) argv; + RedisModuleCallReply *reply = RedisModule_Call(ctx, "ping", "c", "@log"); if (reply) { RedisModule_ReplyWithCallReply(ctx, reply); @@ -115,11 +132,15 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) return REDISMODULE_ERR; if (RedisModule_CreateCommand(ctx,ping_command_name, - HelloFilter_PingCommand,"write deny-oom",1,1,1) == REDISMODULE_ERR) + HelloFilter_PingCommand,"deny-oom",1,1,1) == REDISMODULE_ERR) return REDISMODULE_ERR; - if (RedisModule_RegisterCommandFilter(ctx, HelloFilter_CommandFilter) - == REDISMODULE_ERR) return REDISMODULE_ERR; + if (RedisModule_CreateCommand(ctx,unregister_command_name, + HelloFilter_UnregisterCommand,"write deny-oom",1,1,1) == REDISMODULE_ERR) + return REDISMODULE_ERR; + + if ((filter = RedisModule_RegisterCommandFilter(ctx, HelloFilter_CommandFilter)) + == NULL) return REDISMODULE_ERR; return REDISMODULE_OK; } diff --git a/src/redismodule.h b/src/redismodule.h index 5df83ae6a..37b7d0d59 100644 --- a/src/redismodule.h +++ b/src/redismodule.h @@ -151,6 +151,7 @@ typedef struct RedisModuleClusterInfo RedisModuleClusterInfo; typedef struct RedisModuleDict RedisModuleDict; typedef struct RedisModuleDictIter RedisModuleDictIter; typedef struct RedisModuleCommandFilterCtx RedisModuleCommandFilterCtx; +typedef struct RedisModuleCommandFilter RedisModuleCommandFilter; typedef int (*RedisModuleCmdFunc)(RedisModuleCtx *ctx, RedisModuleString **argv, int argc); typedef void (*RedisModuleDisconnectFunc)(RedisModuleCtx *ctx, RedisModuleBlockedClient *bc); @@ -339,12 +340,13 @@ void REDISMODULE_API_FUNC(RedisModule_SetDisconnectCallback)(RedisModuleBlockedC void REDISMODULE_API_FUNC(RedisModule_SetClusterFlags)(RedisModuleCtx *ctx, uint64_t flags); int REDISMODULE_API_FUNC(RedisModule_ExportSharedAPI)(RedisModuleCtx *ctx, const char *apiname, void *func); void *REDISMODULE_API_FUNC(RedisModule_GetSharedAPI)(RedisModuleCtx *ctx, const char *apiname); -int REDISMODULE_API_FUNC(RedisModule_RegisterCommandFilter)(RedisModuleCtx *ctx, RedisModuleCommandFilterFunc cb); -int REDISMODULE_API_FUNC(RedisModule_CommandFilterArgsCount)(RedisModuleCommandFilterCtx *filter); -const RedisModuleString *REDISMODULE_API_FUNC(RedisModule_CommandFilterArgGet)(RedisModuleCommandFilterCtx *filter, int pos); -int REDISMODULE_API_FUNC(RedisModule_CommandFilterArgInsert)(RedisModuleCommandFilterCtx *filter, int pos, RedisModuleString *arg); -int REDISMODULE_API_FUNC(RedisModule_CommandFilterArgReplace)(RedisModuleCommandFilterCtx *filter, int pos, RedisModuleString *arg); -int REDISMODULE_API_FUNC(RedisModule_CommandFilterArgDelete)(RedisModuleCommandFilterCtx *filter, int pos); +RedisModuleCommandFilter *REDISMODULE_API_FUNC(RedisModule_RegisterCommandFilter)(RedisModuleCtx *ctx, RedisModuleCommandFilterFunc cb); +int REDISMODULE_API_FUNC(RedisModule_UnregisterCommandFilter)(RedisModuleCtx *ctx, RedisModuleCommandFilter *filter); +int REDISMODULE_API_FUNC(RedisModule_CommandFilterArgsCount)(RedisModuleCommandFilterCtx *fctx); +const RedisModuleString *REDISMODULE_API_FUNC(RedisModule_CommandFilterArgGet)(RedisModuleCommandFilterCtx *fctx, int pos); +int REDISMODULE_API_FUNC(RedisModule_CommandFilterArgInsert)(RedisModuleCommandFilterCtx *fctx, int pos, RedisModuleString *arg); +int REDISMODULE_API_FUNC(RedisModule_CommandFilterArgReplace)(RedisModuleCommandFilterCtx *fctx, int pos, RedisModuleString *arg); +int REDISMODULE_API_FUNC(RedisModule_CommandFilterArgDelete)(RedisModuleCommandFilterCtx *fctx, int pos); #endif /* This is included inline inside each Redis module. */ @@ -508,6 +510,7 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int REDISMODULE_GET_API(ExportSharedAPI); REDISMODULE_GET_API(GetSharedAPI); REDISMODULE_GET_API(RegisterCommandFilter); + REDISMODULE_GET_API(UnregisterCommandFilter); REDISMODULE_GET_API(CommandFilterArgsCount); REDISMODULE_GET_API(CommandFilterArgGet); REDISMODULE_GET_API(CommandFilterArgInsert); diff --git a/tests/modules/commandfilter.tcl b/tests/modules/commandfilter.tcl index 47d9c302c..8645d8279 100644 --- a/tests/modules/commandfilter.tcl +++ b/tests/modules/commandfilter.tcl @@ -42,4 +42,26 @@ start_server {tags {"modules"}} { r eval "redis.call('hellofilter.ping')" 0 r lrange log-key 0 -1 } "{ping @log}" + + test {Command Filter is unregistered implicitly on module unload} { + r del log-key + r module unload hellofilter + r set mykey @log + r lrange log-key 0 -1 + } {} + + r module load $testmodule log-key-2 + + test {Command Filter unregister works as expected} { + # Validate reloading succeeded + r set mykey @log + assert_equal "{set mykey @log}" [r lrange log-key-2 0 -1] + + # Unregister + r hellofilter.unregister + r del log-key-2 + + r set mykey @log + r lrange log-key-2 0 -1 + } {} } From b8568a98fd39ecd9e540d74b79ee97a68b0ce8be Mon Sep 17 00:00:00 2001 From: Yossi Gottlieb Date: Thu, 21 Mar 2019 19:45:41 +0200 Subject: [PATCH 14/17] CommandFilter API: fix UnregisterCommandFilter. --- src/module.c | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/module.c b/src/module.c index ad7bba2eb..a54bd1ad8 100644 --- a/src/module.c +++ b/src/module.c @@ -4887,9 +4887,8 @@ int RM_UnregisterCommandFilter(RedisModuleCtx *ctx, RedisModuleCommandFilter *fi listDelNode(moduleCommandFilters,ln); ln = listSearchKey(ctx->module->filters,filter); - if (ln) { - listDelNode(moduleCommandFilters,ln); - } + if (!ln) return REDISMODULE_ERR; /* Shouldn't happen */ + listDelNode(ctx->module->filters,ln); return REDISMODULE_OK; } From 898677d59e6b77a11aced5d68b32da7e94c075fe Mon Sep 17 00:00:00 2001 From: Yossi Gottlieb Date: Thu, 21 Mar 2019 19:47:43 +0200 Subject: [PATCH 15/17] CommandFilter API: REDISMODULE_CMDFILTER_NOSELF. Add a flag to automatically protect filters from being called recursively by their own module. --- src/module.c | 28 +++++++++++++++++++++++++--- src/modules/hellofilter.c | 15 +++++++++------ src/redismodule.h | 7 ++++++- tests/modules/commandfilter.tcl | 27 ++++++++++++++++++++++----- 4 files changed, 62 insertions(+), 15 deletions(-) diff --git a/src/module.c b/src/module.c index a54bd1ad8..4ff865d2c 100644 --- a/src/module.c +++ b/src/module.c @@ -50,6 +50,7 @@ struct RedisModule { list *usedby; /* List of modules using APIs from this one. */ list *using; /* List of modules we use some APIs of. */ list *filters; /* List of filters the module has registered. */ + int in_call; /* RM_Call() nesting level */ }; typedef struct RedisModule RedisModule; @@ -283,6 +284,8 @@ typedef struct RedisModuleCommandFilter { RedisModule *module; /* Filter callback function */ RedisModuleCommandFilterFunc callback; + /* REDISMODULE_CMDFILTER_* flags */ + int flags; } RedisModuleCommandFilter; /* Registered filters */ @@ -2756,6 +2759,8 @@ RedisModuleCallReply *RM_Call(RedisModuleCtx *ctx, const char *cmdname, const ch c->db = ctx->client->db; c->argv = argv; c->argc = argc; + if (ctx->module) ctx->module->in_call++; + /* We handle the above format error only when the client is setup so that * we can free it normally. */ if (argv == NULL) goto cleanup; @@ -2822,6 +2827,7 @@ RedisModuleCallReply *RM_Call(RedisModuleCtx *ctx, const char *cmdname, const ch autoMemoryAdd(ctx,REDISMODULE_AM_REPLY,reply); cleanup: + if (ctx->module) ctx->module->in_call--; freeClient(c); return reply; } @@ -4857,17 +4863,27 @@ int moduleUnregisterFilters(RedisModule *module) { * * Note that in the above use case, if `MODULE.SET` itself uses * `RedisModule_Call()` the filter will be applied on that call as well. If - * that is not desired, the module itself is responsible for maintaining a flag - * to identify and avoid this form of re-entrancy. + * that is not desired, the `REDISMODULE_CMDFILTER_NOSELF` flag can be set when + * registering the filter. + * + * The `REDISMODULE_CMDFILTER_NOSELF` flag prevents execution flows that + * originate from the module's own `RM_Call()` from reaching the filter. This + * flag is effective for all execution flows, including nested ones, as long as + * the execution begins from the module's command context or a thread-safe + * context that is associated with a blocking command. + * + * Detached thread-safe contexts are *not* associated with the module and cannot + * be protected by this flag. * * If multiple filters are registered (by the same or different modules), they * are executed in the order of registration. */ -RedisModuleCommandFilter *RM_RegisterCommandFilter(RedisModuleCtx *ctx, RedisModuleCommandFilterFunc callback) { +RedisModuleCommandFilter *RM_RegisterCommandFilter(RedisModuleCtx *ctx, RedisModuleCommandFilterFunc callback, int flags) { RedisModuleCommandFilter *filter = zmalloc(sizeof(*filter)); filter->module = ctx->module; filter->callback = callback; + filter->flags = flags; listAddNodeTail(moduleCommandFilters, filter); listAddNodeTail(ctx->module->filters, filter); @@ -4908,6 +4924,12 @@ void moduleCallCommandFilters(client *c) { while((ln = listNext(&li))) { RedisModuleCommandFilter *f = ln->value; + /* Skip filter if REDISMODULE_CMDFILTER_NOSELF is set and module is + * currently processing a command. + */ + if ((f->flags & REDISMODULE_CMDFILTER_NOSELF) && f->module->in_call) continue; + + /* Call filter */ f->callback(&filter); } diff --git a/src/modules/hellofilter.c b/src/modules/hellofilter.c index 9cd440df2..448e12983 100644 --- a/src/modules/hellofilter.c +++ b/src/modules/hellofilter.c @@ -8,7 +8,7 @@ static RedisModuleString *log_key_name; static const char log_command_name[] = "hellofilter.log"; static const char ping_command_name[] = "hellofilter.ping"; static const char unregister_command_name[] = "hellofilter.unregister"; -static int in_module = 0; +static int in_log_command = 0; static RedisModuleCommandFilter *filter = NULL; @@ -57,7 +57,7 @@ int HelloFilter_LogCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int ar RedisModule_CloseKey(log); RedisModule_FreeString(ctx, s); - in_module = 1; + in_log_command = 1; size_t cmdlen; const char *cmdname = RedisModule_StringPtrLen(argv[1], &cmdlen); @@ -69,14 +69,14 @@ int HelloFilter_LogCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int ar RedisModule_ReplyWithSimpleString(ctx, "Unknown command or invalid arguments"); } - in_module = 0; + in_log_command = 0; return REDISMODULE_OK; } void HelloFilter_CommandFilter(RedisModuleCommandFilterCtx *filter) { - if (in_module) return; /* don't process our own RM_Call() */ + if (in_log_command) return; /* don't process our own RM_Call() from HelloFilter_LogCommand() */ /* Fun manipulations: * - Remove @delme @@ -120,12 +120,14 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) if (RedisModule_Init(ctx,"hellofilter",1,REDISMODULE_APIVER_1) == REDISMODULE_ERR) return REDISMODULE_ERR; - if (argc != 1) { + if (argc != 2) { RedisModule_Log(ctx, "warning", "Log key name not specified"); return REDISMODULE_ERR; } + long long noself = 0; log_key_name = RedisModule_CreateStringFromString(ctx, argv[0]); + RedisModule_StringToLongLong(argv[1], &noself); if (RedisModule_CreateCommand(ctx,log_command_name, HelloFilter_LogCommand,"write deny-oom",1,1,1) == REDISMODULE_ERR) @@ -139,7 +141,8 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) HelloFilter_UnregisterCommand,"write deny-oom",1,1,1) == REDISMODULE_ERR) return REDISMODULE_ERR; - if ((filter = RedisModule_RegisterCommandFilter(ctx, HelloFilter_CommandFilter)) + if ((filter = RedisModule_RegisterCommandFilter(ctx, HelloFilter_CommandFilter, + noself ? REDISMODULE_CMDFILTER_NOSELF : 0)) == NULL) return REDISMODULE_ERR; return REDISMODULE_OK; diff --git a/src/redismodule.h b/src/redismodule.h index 37b7d0d59..e567743a4 100644 --- a/src/redismodule.h +++ b/src/redismodule.h @@ -132,6 +132,11 @@ * of timers that are going to expire, sorted by expire time. */ typedef uint64_t RedisModuleTimerID; +/* CommandFilter Flags */ + +/* Do filter RedisModule_Call() commands initiated by module itself. */ +#define REDISMODULE_CMDFILTER_NOSELF (1<<0) + /* ------------------------- End of common defines ------------------------ */ #ifndef REDISMODULE_CORE @@ -340,7 +345,7 @@ void REDISMODULE_API_FUNC(RedisModule_SetDisconnectCallback)(RedisModuleBlockedC void REDISMODULE_API_FUNC(RedisModule_SetClusterFlags)(RedisModuleCtx *ctx, uint64_t flags); int REDISMODULE_API_FUNC(RedisModule_ExportSharedAPI)(RedisModuleCtx *ctx, const char *apiname, void *func); void *REDISMODULE_API_FUNC(RedisModule_GetSharedAPI)(RedisModuleCtx *ctx, const char *apiname); -RedisModuleCommandFilter *REDISMODULE_API_FUNC(RedisModule_RegisterCommandFilter)(RedisModuleCtx *ctx, RedisModuleCommandFilterFunc cb); +RedisModuleCommandFilter *REDISMODULE_API_FUNC(RedisModule_RegisterCommandFilter)(RedisModuleCtx *ctx, RedisModuleCommandFilterFunc cb, int flags); int REDISMODULE_API_FUNC(RedisModule_UnregisterCommandFilter)(RedisModuleCtx *ctx, RedisModuleCommandFilter *filter); int REDISMODULE_API_FUNC(RedisModule_CommandFilterArgsCount)(RedisModuleCommandFilterCtx *fctx); const RedisModuleString *REDISMODULE_API_FUNC(RedisModule_CommandFilterArgGet)(RedisModuleCommandFilterCtx *fctx, int pos); diff --git a/tests/modules/commandfilter.tcl b/tests/modules/commandfilter.tcl index 8645d8279..1e5c41d2b 100644 --- a/tests/modules/commandfilter.tcl +++ b/tests/modules/commandfilter.tcl @@ -1,7 +1,7 @@ set testmodule [file normalize src/modules/hellofilter.so] start_server {tags {"modules"}} { - r module load $testmodule log-key + r module load $testmodule log-key 0 test {Command Filter handles redirected commands} { r set mykey @log @@ -50,18 +50,35 @@ start_server {tags {"modules"}} { r lrange log-key 0 -1 } {} - r module load $testmodule log-key-2 + r module load $testmodule log-key 0 test {Command Filter unregister works as expected} { # Validate reloading succeeded + r del log-key r set mykey @log - assert_equal "{set mykey @log}" [r lrange log-key-2 0 -1] + assert_equal "{set mykey @log}" [r lrange log-key 0 -1] # Unregister r hellofilter.unregister - r del log-key-2 + r del log-key r set mykey @log - r lrange log-key-2 0 -1 + r lrange log-key 0 -1 } {} + + r module unload hellofilter + r module load $testmodule log-key 1 + + test {Command Filter REDISMODULE_CMDFILTER_NOSELF works as expected} { + r set mykey @log + assert_equal "{set mykey @log}" [r lrange log-key 0 -1] + + r del log-key + r hellofilter.ping + assert_equal {} [r lrange log-key 0 -1] + + r eval "redis.call('hellofilter.ping')" 0 + assert_equal {} [r lrange log-key 0 -1] + } + } From eb40ac6c8e1617962fc1b0d8fcc9f82e3ab0227c Mon Sep 17 00:00:00 2001 From: Oran Agra Date: Tue, 21 Mar 2017 07:20:02 -0700 Subject: [PATCH 16/17] diskless fork kept streaming RDB to a disconnected slave --- src/networking.c | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/src/networking.c b/src/networking.c index c08f43e6a..09cbff387 100644 --- a/src/networking.c +++ b/src/networking.c @@ -911,6 +911,16 @@ void unlinkClient(client *c) { c->client_list_node = NULL; } + /* In the case of diskless replication the fork is writing to the + * sockets and just closing the fd isn't enough, if we don't also + * shutdown the socket the fork will continue to write to the slave + * and the salve will only find out that it was disconnected when + * it will finish reading the rdb. */ + if ((c->flags & CLIENT_SLAVE) && + (c->replstate == SLAVE_STATE_WAIT_BGSAVE_END)) { + shutdown(c->fd, SHUT_RDWR); + } + /* Unregister async I/O handlers and close the socket. */ aeDeleteFileEvent(server.el,c->fd,AE_READABLE); aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE); From 1a24f23a50095d95beba39536d59d4e777418252 Mon Sep 17 00:00:00 2001 From: Dvir Volk Date: Thu, 21 Mar 2019 20:33:11 +0200 Subject: [PATCH 17/17] Renamed event name from "miss" to "keymiss" --- src/db.c | 8 ++++---- src/modules/testmodule.c | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/db.c b/src/db.c index afe181281..b537a29a4 100644 --- a/src/db.c +++ b/src/db.c @@ -83,7 +83,7 @@ robj *lookupKey(redisDb *db, robj *key, int flags) { * 1. A key gets expired if it reached it's TTL. * 2. The key last access time is updated. * 3. The global keys hits/misses stats are updated (reported in INFO). - * 4. If keyspace notifications are enabled, a "miss" notification is fired. + * 4. If keyspace notifications are enabled, a "keymiss" notification is fired. * * This API should not be used when we write to the key after obtaining * the object linked to the key, but only for read only operations. @@ -107,7 +107,7 @@ robj *lookupKeyReadWithFlags(redisDb *db, robj *key, int flags) { * to return NULL ASAP. */ if (server.masterhost == NULL) { server.stat_keyspace_misses++; - notifyKeyspaceEvent(NOTIFY_KEY_MISS, "miss", key, db->id); + notifyKeyspaceEvent(NOTIFY_KEY_MISS, "keymiss", key, db->id); return NULL; } @@ -129,14 +129,14 @@ robj *lookupKeyReadWithFlags(redisDb *db, robj *key, int flags) { server.current_client->cmd->flags & CMD_READONLY) { server.stat_keyspace_misses++; - notifyKeyspaceEvent(NOTIFY_KEY_MISS, "miss", key, db->id); + notifyKeyspaceEvent(NOTIFY_KEY_MISS, "keymiss", key, db->id); return NULL; } } val = lookupKey(db,key,flags); if (val == NULL) { server.stat_keyspace_misses++; - notifyKeyspaceEvent(NOTIFY_KEY_MISS, "miss", key, db->id); + notifyKeyspaceEvent(NOTIFY_KEY_MISS, "keymiss", key, db->id); } else server.stat_keyspace_hits++; diff --git a/src/modules/testmodule.c b/src/modules/testmodule.c index af78d21d7..5381380e5 100644 --- a/src/modules/testmodule.c +++ b/src/modules/testmodule.c @@ -188,7 +188,7 @@ int TestNotifications(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { RedisModule_Call(ctx, "LPUSH", "cc", "l", "y"); RedisModule_Call(ctx, "LPUSH", "cc", "l", "y"); - /* Miss some keys intentionally so we will get a "miss" notification. */ + /* Miss some keys intentionally so we will get a "keymiss" notification. */ RedisModule_Call(ctx, "GET", "c", "nosuchkey"); RedisModule_Call(ctx, "SMEMBERS", "c", "nosuchkey");