diff --git a/src/db.c b/src/db.c index c46b6de18..9696150d7 100644 --- a/src/db.c +++ b/src/db.c @@ -83,6 +83,7 @@ robj *lookupKey(redisDb *db, robj *key, int flags) { * 1. A key gets expired if it reached it's TTL. * 2. The key last access time is updated. * 3. The global keys hits/misses stats are updated (reported in INFO). + * 4. If keyspace notifications are enabled, a "keymiss" notification is fired. * * This API should not be used when we write to the key after obtaining * the object linked to the key, but only for read only operations. @@ -107,6 +108,7 @@ robj *lookupKeyReadWithFlags(redisDb *db, robj *key, int flags) { * to return NULL ASAP. */ if (server.masterhost == NULL) { server.stat_keyspace_misses++; + notifyKeyspaceEvent(NOTIFY_KEY_MISS, "keymiss", key, db->id); return NULL; } @@ -128,12 +130,15 @@ robj *lookupKeyReadWithFlags(redisDb *db, robj *key, int flags) { server.current_client->cmd->flags & CMD_READONLY) { server.stat_keyspace_misses++; + notifyKeyspaceEvent(NOTIFY_KEY_MISS, "keymiss", key, db->id); return NULL; } } val = lookupKey(db,key,flags); - if (val == NULL) + if (val == NULL) { server.stat_keyspace_misses++; + notifyKeyspaceEvent(NOTIFY_KEY_MISS, "keymiss", key, db->id); + } else server.stat_keyspace_hits++; return val; diff --git a/src/module.c b/src/module.c index 4e4a584af..3325e25ed 100644 --- a/src/module.c +++ b/src/module.c @@ -49,6 +49,8 @@ struct RedisModule { list *types; /* Module data types. */ list *usedby; /* List of modules using APIs from this one. */ list *using; /* List of modules we use some APIs of. */ + list *filters; /* List of filters the module has registered. */ + int in_call; /* RM_Call() nesting level */ }; typedef struct RedisModule RedisModule; @@ -270,6 +272,25 @@ typedef struct RedisModuleDictIter { raxIterator ri; } RedisModuleDictIter; +typedef struct RedisModuleCommandFilterCtx { + RedisModuleString **argv; + int argc; +} RedisModuleCommandFilterCtx; + +typedef void (*RedisModuleCommandFilterFunc) (RedisModuleCommandFilterCtx *filter); + +typedef struct RedisModuleCommandFilter { + /* The module that registered the filter */ + RedisModule *module; + /* Filter callback function */ + RedisModuleCommandFilterFunc callback; + /* REDISMODULE_CMDFILTER_* flags */ + int flags; +} RedisModuleCommandFilter; + +/* Registered filters */ +static list *moduleCommandFilters; + /* -------------------------------------------------------------------------- * Prototypes * -------------------------------------------------------------------------- */ @@ -732,6 +753,7 @@ void RM_SetModuleAttribs(RedisModuleCtx *ctx, const char *name, int ver, int api module->types = listCreate(); module->usedby = listCreate(); module->using = listCreate(); + module->filters = listCreate(); ctx->module = module; } @@ -2725,12 +2747,6 @@ RedisModuleCallReply *RM_Call(RedisModuleCtx *ctx, const char *cmdname, const ch RedisModuleCallReply *reply = NULL; int replicate = 0; /* Replicate this command? */ - cmd = lookupCommandByCString((char*)cmdname); - if (!cmd) { - errno = EINVAL; - return NULL; - } - /* Create the client and dispatch the command. */ va_start(ap, fmt); c = createClient(-1, IDX_EVENT_LOOP_MAIN); @@ -2744,11 +2760,25 @@ RedisModuleCallReply *RM_Call(RedisModuleCtx *ctx, const char *cmdname, const ch c->db = ctx->client->db; c->argv = argv; c->argc = argc; - c->cmd = c->lastcmd = cmd; + if (ctx->module) ctx->module->in_call++; + /* We handle the above format error only when the client is setup so that * we can free it normally. */ if (argv == NULL) goto cleanup; + /* Call command filters */ + moduleCallCommandFilters(c); + + /* Lookup command now, after filters had a chance to make modifications + * if necessary. + */ + cmd = lookupCommand(ptrFromObj(c->argv[0])); + if (!cmd) { + errno = EINVAL; + goto cleanup; + } + c->cmd = c->lastcmd = cmd; + /* Basic arity checks. */ if ((cmd->arity > 0 && cmd->arity != argc) || (argc < -cmd->arity)) { errno = EINVAL; @@ -2798,6 +2828,7 @@ RedisModuleCallReply *RM_Call(RedisModuleCtx *ctx, const char *cmdname, const ch autoMemoryAdd(ctx,REDISMODULE_AM_REPLY,reply); cleanup: + if (ctx->module) ctx->module->in_call--; freeClient(c); return reply; } @@ -4808,6 +4839,214 @@ int moduleUnregisterUsedAPI(RedisModule *module) { return count; } +/* Unregister all filters registered by a module. + * This is called when a module is being unloaded. + * + * Returns the number of filters unregistered. */ +int moduleUnregisterFilters(RedisModule *module) { + listIter li; + listNode *ln; + int count = 0; + + listRewind(module->filters,&li); + while((ln = listNext(&li))) { + RedisModuleCommandFilter *filter = ln->value; + listNode *ln = listSearchKey(moduleCommandFilters,filter); + if (ln) { + listDelNode(moduleCommandFilters,ln); + count++; + } + zfree(filter); + } + return count; +} + +/* -------------------------------------------------------------------------- + * Module Command Filter API + * -------------------------------------------------------------------------- */ + +/* Register a new command filter function. + * + * Command filtering makes it possible for modules to extend Redis by plugging + * into the execution flow of all commands. + * + * A registered filter gets called before Redis executes *any* command. This + * includes both core Redis commands and commands registered by any module. The + * filter applies in all execution paths including: + * + * 1. Invocation by a client. + * 2. Invocation through `RedisModule_Call()` by any module. + * 3. Invocation through Lua 'redis.call()`. + * 4. Replication of a command from a master. + * + * The filter executes in a special filter context, which is different and more + * limited than a RedisModuleCtx. Because the filter affects any command, it + * must be implemented in a very efficient way to reduce the performance impact + * on Redis. All Redis Module API calls that require a valid context (such as + * `RedisModule_Call()`, `RedisModule_OpenKey()`, etc.) are not supported in a + * filter context. + * + * The `RedisModuleCommandFilterCtx` can be used to inspect or modify the + * executed command and its arguments. As the filter executes before Redis + * begins processing the command, any change will affect the way the command is + * processed. For example, a module can override Redis commands this way: + * + * 1. Register a `MODULE.SET` command which implements an extended version of + * the Redis `SET` command. + * 2. Register a command filter which detects invocation of `SET` on a specific + * pattern of keys. Once detected, the filter will replace the first + * argument from `SET` to `MODULE.SET`. + * 3. When filter execution is complete, Redis considers the new command name + * and therefore executes the module's own command. + * + * Note that in the above use case, if `MODULE.SET` itself uses + * `RedisModule_Call()` the filter will be applied on that call as well. If + * that is not desired, the `REDISMODULE_CMDFILTER_NOSELF` flag can be set when + * registering the filter. + * + * The `REDISMODULE_CMDFILTER_NOSELF` flag prevents execution flows that + * originate from the module's own `RM_Call()` from reaching the filter. This + * flag is effective for all execution flows, including nested ones, as long as + * the execution begins from the module's command context or a thread-safe + * context that is associated with a blocking command. + * + * Detached thread-safe contexts are *not* associated with the module and cannot + * be protected by this flag. + * + * If multiple filters are registered (by the same or different modules), they + * are executed in the order of registration. + */ + +RedisModuleCommandFilter *RM_RegisterCommandFilter(RedisModuleCtx *ctx, RedisModuleCommandFilterFunc callback, int flags) { + RedisModuleCommandFilter *filter = zmalloc(sizeof(*filter), MALLOC_LOCAL); + filter->module = ctx->module; + filter->callback = callback; + filter->flags = flags; + + listAddNodeTail(moduleCommandFilters, filter); + listAddNodeTail(ctx->module->filters, filter); + return filter; +} + +/* Unregister a command filter. + */ +int RM_UnregisterCommandFilter(RedisModuleCtx *ctx, RedisModuleCommandFilter *filter) { + listNode *ln; + + /* A module can only remove its own filters */ + if (filter->module != ctx->module) return REDISMODULE_ERR; + + ln = listSearchKey(moduleCommandFilters,filter); + if (!ln) return REDISMODULE_ERR; + listDelNode(moduleCommandFilters,ln); + + ln = listSearchKey(ctx->module->filters,filter); + if (!ln) return REDISMODULE_ERR; /* Shouldn't happen */ + listDelNode(ctx->module->filters,ln); + + return REDISMODULE_OK; +} + +void moduleCallCommandFilters(client *c) { + if (listLength(moduleCommandFilters) == 0) return; + + listIter li; + listNode *ln; + listRewind(moduleCommandFilters,&li); + + RedisModuleCommandFilterCtx filter = { + .argv = c->argv, + .argc = c->argc + }; + + while((ln = listNext(&li))) { + RedisModuleCommandFilter *f = ln->value; + + /* Skip filter if REDISMODULE_CMDFILTER_NOSELF is set and module is + * currently processing a command. + */ + if ((f->flags & REDISMODULE_CMDFILTER_NOSELF) && f->module->in_call) continue; + + /* Call filter */ + f->callback(&filter); + } + + c->argv = filter.argv; + c->argc = filter.argc; +} + +/* Return the number of arguments a filtered command has. The number of + * arguments include the command itself. + */ +int RM_CommandFilterArgsCount(RedisModuleCommandFilterCtx *fctx) +{ + return fctx->argc; +} + +/* Return the specified command argument. The first argument (position 0) is + * the command itself, and the rest are user-provided args. + */ +const RedisModuleString *RM_CommandFilterArgGet(RedisModuleCommandFilterCtx *fctx, int pos) +{ + if (pos < 0 || pos >= fctx->argc) return NULL; + return fctx->argv[pos]; +} + +/* Modify the filtered command by inserting a new argument at the specified + * position. The specified RedisModuleString argument may be used by Redis + * after the filter context is destroyed, so it must not be auto-memory + * allocated, freed or used elsewhere. + */ + +int RM_CommandFilterArgInsert(RedisModuleCommandFilterCtx *fctx, int pos, RedisModuleString *arg) +{ + int i; + + if (pos < 0 || pos > fctx->argc) return REDISMODULE_ERR; + + fctx->argv = zrealloc(fctx->argv, (fctx->argc+1)*sizeof(RedisModuleString *), MALLOC_LOCAL); + for (i = fctx->argc; i > pos; i--) { + fctx->argv[i] = fctx->argv[i-1]; + } + fctx->argv[pos] = arg; + fctx->argc++; + + return REDISMODULE_OK; +} + +/* Modify the filtered command by replacing an existing argument with a new one. + * The specified RedisModuleString argument may be used by Redis after the + * filter context is destroyed, so it must not be auto-memory allocated, freed + * or used elsewhere. + */ + +int RM_CommandFilterArgReplace(RedisModuleCommandFilterCtx *fctx, int pos, RedisModuleString *arg) +{ + if (pos < 0 || pos >= fctx->argc) return REDISMODULE_ERR; + + decrRefCount(fctx->argv[pos]); + fctx->argv[pos] = arg; + + return REDISMODULE_OK; +} + +/* Modify the filtered command by deleting an argument at the specified + * position. + */ +int RM_CommandFilterArgDelete(RedisModuleCommandFilterCtx *fctx, int pos) +{ + int i; + if (pos < 0 || pos >= fctx->argc) return REDISMODULE_ERR; + + decrRefCount(fctx->argv[pos]); + for (i = pos; i < fctx->argc-1; i++) { + fctx->argv[i] = fctx->argv[i+1]; + } + fctx->argc--; + + return REDISMODULE_OK; +} + /* -------------------------------------------------------------------------- * Modules API internals * -------------------------------------------------------------------------- */ @@ -4854,6 +5093,9 @@ void moduleInitModulesSystem(void) { moduleFreeContextReusedClient->flags |= CLIENT_MODULE; moduleFreeContextReusedClient->puser = NULL; /* root user. */ + /* Set up filter list */ + moduleCommandFilters = listCreate(); + moduleRegisterCoreAPI(); if (pipe(server.module_blocked_pipe) == -1) { serverLog(LL_WARNING, @@ -4904,6 +5146,7 @@ void moduleLoadFromQueue(void) { void moduleFreeModuleStructure(struct RedisModule *module) { listRelease(module->types); + listRelease(module->filters); sdsfree(module->name); zfree(module); } @@ -4995,6 +5238,7 @@ int moduleUnload(sds name) { moduleUnregisterCommands(module); moduleUnregisterSharedAPI(module); moduleUnregisterUsedAPI(module); + moduleUnregisterFilters(module); /* Remove any notification subscribers this module might have */ moduleUnsubscribeNotifications(module); @@ -5259,4 +5503,11 @@ void moduleRegisterCoreAPI(void) { REGISTER_API(DictCompare); REGISTER_API(ExportSharedAPI); REGISTER_API(GetSharedAPI); + REGISTER_API(RegisterCommandFilter); + REGISTER_API(UnregisterCommandFilter); + REGISTER_API(CommandFilterArgsCount); + REGISTER_API(CommandFilterArgGet); + REGISTER_API(CommandFilterArgInsert); + REGISTER_API(CommandFilterArgReplace); + REGISTER_API(CommandFilterArgDelete); } diff --git a/src/modules/Makefile b/src/modules/Makefile index 51ffac17d..537aa0daf 100644 --- a/src/modules/Makefile +++ b/src/modules/Makefile @@ -13,7 +13,7 @@ endif .SUFFIXES: .c .so .xo .o -all: helloworld.so hellotype.so helloblock.so testmodule.so hellocluster.so hellotimer.so hellodict.so +all: helloworld.so hellotype.so helloblock.so testmodule.so hellocluster.so hellotimer.so hellodict.so hellofilter.so .c.xo: $(CC) -I. $(CFLAGS) $(SHOBJ_CFLAGS) -fPIC -c $< -o $@ @@ -46,6 +46,10 @@ hellotimer.so: hellotimer.xo hellodict.xo: ../redismodule.h hellodict.so: hellodict.xo + +hellofilter.xo: ../redismodule.h + +hellofilter.so: hellofilter.xo $(LD) -o $@ $< $(SHOBJ_LDFLAGS) $(LIBS) -lc testmodule.xo: ../redismodule.h diff --git a/src/modules/hellofilter.c b/src/modules/hellofilter.c new file mode 100644 index 000000000..448e12983 --- /dev/null +++ b/src/modules/hellofilter.c @@ -0,0 +1,149 @@ +#define REDISMODULE_EXPERIMENTAL_API +#include "../redismodule.h" + +#include + +static RedisModuleString *log_key_name; + +static const char log_command_name[] = "hellofilter.log"; +static const char ping_command_name[] = "hellofilter.ping"; +static const char unregister_command_name[] = "hellofilter.unregister"; +static int in_log_command = 0; + +static RedisModuleCommandFilter *filter = NULL; + +int HelloFilter_UnregisterCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) +{ + (void) argc; + (void) argv; + + RedisModule_ReplyWithLongLong(ctx, + RedisModule_UnregisterCommandFilter(ctx, filter)); + + return REDISMODULE_OK; +} + +int HelloFilter_PingCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) +{ + (void) argc; + (void) argv; + + RedisModuleCallReply *reply = RedisModule_Call(ctx, "ping", "c", "@log"); + if (reply) { + RedisModule_ReplyWithCallReply(ctx, reply); + RedisModule_FreeCallReply(reply); + } else { + RedisModule_ReplyWithSimpleString(ctx, "Unknown command or invalid arguments"); + } + + return REDISMODULE_OK; +} + +int HelloFilter_LogCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) +{ + RedisModuleString *s = RedisModule_CreateString(ctx, "", 0); + + int i; + for (i = 1; i < argc; i++) { + size_t arglen; + const char *arg = RedisModule_StringPtrLen(argv[i], &arglen); + + if (i > 1) RedisModule_StringAppendBuffer(ctx, s, " ", 1); + RedisModule_StringAppendBuffer(ctx, s, arg, arglen); + } + + RedisModuleKey *log = RedisModule_OpenKey(ctx, log_key_name, REDISMODULE_WRITE|REDISMODULE_READ); + RedisModule_ListPush(log, REDISMODULE_LIST_HEAD, s); + RedisModule_CloseKey(log); + RedisModule_FreeString(ctx, s); + + in_log_command = 1; + + size_t cmdlen; + const char *cmdname = RedisModule_StringPtrLen(argv[1], &cmdlen); + RedisModuleCallReply *reply = RedisModule_Call(ctx, cmdname, "v", &argv[2], argc - 2); + if (reply) { + RedisModule_ReplyWithCallReply(ctx, reply); + RedisModule_FreeCallReply(reply); + } else { + RedisModule_ReplyWithSimpleString(ctx, "Unknown command or invalid arguments"); + } + + in_log_command = 0; + + return REDISMODULE_OK; +} + +void HelloFilter_CommandFilter(RedisModuleCommandFilterCtx *filter) +{ + if (in_log_command) return; /* don't process our own RM_Call() from HelloFilter_LogCommand() */ + + /* Fun manipulations: + * - Remove @delme + * - Replace @replaceme + * - Append @insertbefore or @insertafter + * - Prefix with Log command if @log encounterd + */ + int log = 0; + int pos = 0; + while (pos < RedisModule_CommandFilterArgsCount(filter)) { + const RedisModuleString *arg = RedisModule_CommandFilterArgGet(filter, pos); + size_t arg_len; + const char *arg_str = RedisModule_StringPtrLen(arg, &arg_len); + + if (arg_len == 6 && !memcmp(arg_str, "@delme", 6)) { + RedisModule_CommandFilterArgDelete(filter, pos); + continue; + } + if (arg_len == 10 && !memcmp(arg_str, "@replaceme", 10)) { + RedisModule_CommandFilterArgReplace(filter, pos, + RedisModule_CreateString(NULL, "--replaced--", 12)); + } else if (arg_len == 13 && !memcmp(arg_str, "@insertbefore", 13)) { + RedisModule_CommandFilterArgInsert(filter, pos, + RedisModule_CreateString(NULL, "--inserted-before--", 19)); + pos++; + } else if (arg_len == 12 && !memcmp(arg_str, "@insertafter", 12)) { + RedisModule_CommandFilterArgInsert(filter, pos + 1, + RedisModule_CreateString(NULL, "--inserted-after--", 18)); + pos++; + } else if (arg_len == 4 && !memcmp(arg_str, "@log", 4)) { + log = 1; + } + pos++; + } + + if (log) RedisModule_CommandFilterArgInsert(filter, 0, + RedisModule_CreateString(NULL, log_command_name, sizeof(log_command_name)-1)); +} + +int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + if (RedisModule_Init(ctx,"hellofilter",1,REDISMODULE_APIVER_1) + == REDISMODULE_ERR) return REDISMODULE_ERR; + + if (argc != 2) { + RedisModule_Log(ctx, "warning", "Log key name not specified"); + return REDISMODULE_ERR; + } + + long long noself = 0; + log_key_name = RedisModule_CreateStringFromString(ctx, argv[0]); + RedisModule_StringToLongLong(argv[1], &noself); + + if (RedisModule_CreateCommand(ctx,log_command_name, + HelloFilter_LogCommand,"write deny-oom",1,1,1) == REDISMODULE_ERR) + return REDISMODULE_ERR; + + if (RedisModule_CreateCommand(ctx,ping_command_name, + HelloFilter_PingCommand,"deny-oom",1,1,1) == REDISMODULE_ERR) + return REDISMODULE_ERR; + + if (RedisModule_CreateCommand(ctx,unregister_command_name, + HelloFilter_UnregisterCommand,"write deny-oom",1,1,1) == REDISMODULE_ERR) + return REDISMODULE_ERR; + + if ((filter = RedisModule_RegisterCommandFilter(ctx, HelloFilter_CommandFilter, + noself ? REDISMODULE_CMDFILTER_NOSELF : 0)) + == NULL) return REDISMODULE_ERR; + + return REDISMODULE_OK; +} diff --git a/src/modules/testmodule.c b/src/modules/testmodule.c index 67a861704..5381380e5 100644 --- a/src/modules/testmodule.c +++ b/src/modules/testmodule.c @@ -109,9 +109,9 @@ int TestStringPrintf(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { if (argc < 3) { return RedisModule_WrongArity(ctx); } - RedisModuleString *s = RedisModule_CreateStringPrintf(ctx, - "Got %d args. argv[1]: %s, argv[2]: %s", - argc, + RedisModuleString *s = RedisModule_CreateStringPrintf(ctx, + "Got %d args. argv[1]: %s, argv[2]: %s", + argc, RedisModule_StringPtrLen(argv[1], NULL), RedisModule_StringPtrLen(argv[2], NULL) ); @@ -133,7 +133,7 @@ int TestUnlink(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { RedisModuleKey *k = RedisModule_OpenKey(ctx, RedisModule_CreateStringPrintf(ctx, "unlinked"), REDISMODULE_WRITE | REDISMODULE_READ); if (!k) return failTest(ctx, "Could not create key"); - + if (REDISMODULE_ERR == RedisModule_StringSet(k, RedisModule_CreateStringPrintf(ctx, "Foobar"))) { return failTest(ctx, "Could not set string value"); } @@ -152,7 +152,7 @@ int TestUnlink(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { return failTest(ctx, "Could not verify key to be unlinked"); } return RedisModule_ReplyWithSimpleString(ctx, "OK"); - + } int NotifyCallback(RedisModuleCtx *ctx, int type, const char *event, @@ -188,6 +188,10 @@ int TestNotifications(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { RedisModule_Call(ctx, "LPUSH", "cc", "l", "y"); RedisModule_Call(ctx, "LPUSH", "cc", "l", "y"); + /* Miss some keys intentionally so we will get a "keymiss" notification. */ + RedisModule_Call(ctx, "GET", "c", "nosuchkey"); + RedisModule_Call(ctx, "SMEMBERS", "c", "nosuchkey"); + size_t sz; const char *rep; RedisModuleCallReply *r = RedisModule_Call(ctx, "HGET", "cc", "notifications", "foo"); @@ -225,6 +229,16 @@ int TestNotifications(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { FAIL("Wrong reply for l"); } + r = RedisModule_Call(ctx, "HGET", "cc", "notifications", "nosuchkey"); + if (r == NULL || RedisModule_CallReplyType(r) != REDISMODULE_REPLY_STRING) { + FAIL("Wrong or no reply for nosuchkey"); + } else { + rep = RedisModule_CallReplyStringPtr(r, &sz); + if (sz != 1 || *rep != '2') { + FAIL("Got reply '%.*s'. expected '2'", sz, rep); + } + } + RedisModule_Call(ctx, "FLUSHDB", ""); return RedisModule_ReplyWithSimpleString(ctx, "OK"); @@ -423,7 +437,7 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) if (RedisModule_CreateCommand(ctx,"test.ctxflags", TestCtxFlags,"readonly",1,1,1) == REDISMODULE_ERR) return REDISMODULE_ERR; - + if (RedisModule_CreateCommand(ctx,"test.unlink", TestUnlink,"write deny-oom",1,1,1) == REDISMODULE_ERR) return REDISMODULE_ERR; @@ -435,7 +449,8 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) RedisModule_SubscribeToKeyspaceEvents(ctx, REDISMODULE_NOTIFY_HASH | REDISMODULE_NOTIFY_SET | - REDISMODULE_NOTIFY_STRING, + REDISMODULE_NOTIFY_STRING | + REDISMODULE_NOTIFY_KEY_MISS, NotifyCallback); if (RedisModule_CreateCommand(ctx,"test.notify", TestNotifications,"write deny-oom",1,1,1) == REDISMODULE_ERR) diff --git a/src/networking.cpp b/src/networking.cpp index 726fc7ee2..036dca456 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -1211,6 +1211,16 @@ void unlinkClient(client *c) { c->client_list_node = NULL; } + /* In the case of diskless replication the fork is writing to the + * sockets and just closing the fd isn't enough, if we don't also + * shutdown the socket the fork will continue to write to the slave + * and the salve will only find out that it was disconnected when + * it will finish reading the rdb. */ + if ((c->flags & CLIENT_SLAVE) && + (c->replstate == SLAVE_STATE_WAIT_BGSAVE_END)) { + shutdown(c->fd, SHUT_RDWR); + } + /* Unregister async I/O handlers and close the socket. */ aeDeleteFileEvent(server.rgthreadvar[c->iel].el,c->fd,AE_READABLE); aeDeleteFileEvent(server.rgthreadvar[c->iel].el,c->fd,AE_WRITABLE); diff --git a/src/notify.c b/src/notify.c index e7063a626..2213c5e9a 100644 --- a/src/notify.c +++ b/src/notify.c @@ -55,6 +55,7 @@ int keyspaceEventsStringToFlags(char *classes) { case 'K': flags |= NOTIFY_KEYSPACE; break; case 'E': flags |= NOTIFY_KEYEVENT; break; case 't': flags |= NOTIFY_STREAM; break; + case 'm': flags |= NOTIFY_KEY_MISS; break; default: return -1; } } @@ -81,6 +82,7 @@ sds keyspaceEventsFlagsToString(int flags) { if (flags & NOTIFY_EXPIRED) res = sdscatlen(res,"x",1); if (flags & NOTIFY_EVICTED) res = sdscatlen(res,"e",1); if (flags & NOTIFY_STREAM) res = sdscatlen(res,"t",1); + if (flags & NOTIFY_KEY_MISS) res = sdscatlen(res,"m",1); } if (flags & NOTIFY_KEYSPACE) res = sdscatlen(res,"K",1); if (flags & NOTIFY_KEYEVENT) res = sdscatlen(res,"E",1); @@ -100,12 +102,12 @@ void notifyKeyspaceEvent(int type, char *event, robj *key, int dbid) { int len = -1; char buf[24]; - /* If any modules are interested in events, notify the module system now. + /* If any modules are interested in events, notify the module system now. * This bypasses the notifications configuration, but the module engine * will only call event subscribers if the event type matches the types * they are interested in. */ moduleNotifyKeyspaceEvent(type, event, key, dbid); - + /* If notifications for this class of events are off, return ASAP. */ if (!(server.notify_keyspace_events & type)) return; diff --git a/src/redismodule.h b/src/redismodule.h index 02941aa96..259a5f1db 100644 --- a/src/redismodule.h +++ b/src/redismodule.h @@ -98,7 +98,8 @@ #define REDISMODULE_NOTIFY_EXPIRED (1<<8) /* x */ #define REDISMODULE_NOTIFY_EVICTED (1<<9) /* e */ #define REDISMODULE_NOTIFY_STREAM (1<<10) /* t */ -#define REDISMODULE_NOTIFY_ALL (REDISMODULE_NOTIFY_GENERIC | REDISMODULE_NOTIFY_STRING | REDISMODULE_NOTIFY_LIST | REDISMODULE_NOTIFY_SET | REDISMODULE_NOTIFY_HASH | REDISMODULE_NOTIFY_ZSET | REDISMODULE_NOTIFY_EXPIRED | REDISMODULE_NOTIFY_EVICTED | REDISMODULE_NOTIFY_STREAM) /* A */ +#define REDISMODULE_NOTIFY_KEY_MISS (1<<11) /* m */ +#define REDISMODULE_NOTIFY_ALL (REDISMODULE_NOTIFY_GENERIC | REDISMODULE_NOTIFY_STRING | REDISMODULE_NOTIFY_LIST | REDISMODULE_NOTIFY_SET | REDISMODULE_NOTIFY_HASH | REDISMODULE_NOTIFY_ZSET | REDISMODULE_NOTIFY_EXPIRED | REDISMODULE_NOTIFY_EVICTED | REDISMODULE_NOTIFY_STREAM | REDISMODULE_NOTIFY_KEY_MISS) /* A */ /* A special pointer that we can use between the core and the module to signal @@ -132,6 +133,11 @@ * of timers that are going to expire, sorted by expire time. */ typedef uint64_t RedisModuleTimerID; +/* CommandFilter Flags */ + +/* Do filter RedisModule_Call() commands initiated by module itself. */ +#define REDISMODULE_CMDFILTER_NOSELF (1<<0) + /* ------------------------- End of common defines ------------------------ */ #ifndef REDISMODULE_CORE @@ -150,6 +156,8 @@ typedef struct RedisModuleBlockedClient RedisModuleBlockedClient; typedef struct RedisModuleClusterInfo RedisModuleClusterInfo; typedef struct RedisModuleDict RedisModuleDict; typedef struct RedisModuleDictIter RedisModuleDictIter; +typedef struct RedisModuleCommandFilterCtx RedisModuleCommandFilterCtx; +typedef struct RedisModuleCommandFilter RedisModuleCommandFilter; typedef int (*RedisModuleCmdFunc)(RedisModuleCtx *ctx, RedisModuleString **argv, int argc); typedef void (*RedisModuleDisconnectFunc)(RedisModuleCtx *ctx, RedisModuleBlockedClient *bc); @@ -162,6 +170,7 @@ typedef void (*RedisModuleTypeDigestFunc)(RedisModuleDigest *digest, void *value typedef void (*RedisModuleTypeFreeFunc)(void *value); typedef void (*RedisModuleClusterMessageReceiver)(RedisModuleCtx *ctx, const char *sender_id, uint8_t type, const unsigned char *payload, uint32_t len); typedef void (*RedisModuleTimerProc)(RedisModuleCtx *ctx, void *data); +typedef void (*RedisModuleCommandFilterFunc) (RedisModuleCommandFilterCtx *filter); #define REDISMODULE_TYPE_METHOD_VERSION 1 typedef struct RedisModuleTypeMethods { @@ -338,6 +347,13 @@ void REDISMODULE_API_FUNC(RedisModule_SetDisconnectCallback)(RedisModuleBlockedC void REDISMODULE_API_FUNC(RedisModule_SetClusterFlags)(RedisModuleCtx *ctx, uint64_t flags); int REDISMODULE_API_FUNC(RedisModule_ExportSharedAPI)(RedisModuleCtx *ctx, const char *apiname, void *func); void *REDISMODULE_API_FUNC(RedisModule_GetSharedAPI)(RedisModuleCtx *ctx, const char *apiname); +RedisModuleCommandFilter *REDISMODULE_API_FUNC(RedisModule_RegisterCommandFilter)(RedisModuleCtx *ctx, RedisModuleCommandFilterFunc cb, int flags); +int REDISMODULE_API_FUNC(RedisModule_UnregisterCommandFilter)(RedisModuleCtx *ctx, RedisModuleCommandFilter *filter); +int REDISMODULE_API_FUNC(RedisModule_CommandFilterArgsCount)(RedisModuleCommandFilterCtx *fctx); +const RedisModuleString *REDISMODULE_API_FUNC(RedisModule_CommandFilterArgGet)(RedisModuleCommandFilterCtx *fctx, int pos); +int REDISMODULE_API_FUNC(RedisModule_CommandFilterArgInsert)(RedisModuleCommandFilterCtx *fctx, int pos, RedisModuleString *arg); +int REDISMODULE_API_FUNC(RedisModule_CommandFilterArgReplace)(RedisModuleCommandFilterCtx *fctx, int pos, RedisModuleString *arg); +int REDISMODULE_API_FUNC(RedisModule_CommandFilterArgDelete)(RedisModuleCommandFilterCtx *fctx, int pos); #endif /* This is included inline inside each Redis module. */ @@ -501,6 +517,13 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int REDISMODULE_GET_API(SetClusterFlags); REDISMODULE_GET_API(ExportSharedAPI); REDISMODULE_GET_API(GetSharedAPI); + REDISMODULE_GET_API(RegisterCommandFilter); + REDISMODULE_GET_API(UnregisterCommandFilter); + REDISMODULE_GET_API(CommandFilterArgsCount); + REDISMODULE_GET_API(CommandFilterArgGet); + REDISMODULE_GET_API(CommandFilterArgInsert); + REDISMODULE_GET_API(CommandFilterArgReplace); + REDISMODULE_GET_API(CommandFilterArgDelete); #endif if (RedisModule_IsModuleNameBusy && RedisModule_IsModuleNameBusy(name)) return REDISMODULE_ERR; diff --git a/src/replication.cpp b/src/replication.cpp index fd1c6301c..1a1634815 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -651,6 +651,7 @@ int startBgsaveForReplication(int mincapa) { client *slave = (client*)ln->value; if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) { + slave->replstate = REPL_STATE_NONE; slave->flags &= ~CLIENT_SLAVE; listDelNode(server.slaves,ln); addReplyError(slave, diff --git a/src/scripting.cpp b/src/scripting.cpp index b23c5dc5a..08dbedeb9 100644 --- a/src/scripting.cpp +++ b/src/scripting.cpp @@ -474,6 +474,11 @@ int luaRedisGenericCommand(lua_State *lua, int raise_error) { c->argc = argc; c->puser = server.lua_caller->puser; + /* Process module hooks */ + moduleCallCommandFilters(c); + argv = c->argv; + argc = c->argc; + /* Log the command if debugging is active. */ if (ldb.active && ldb.step) { sds cmdlog = sdsnew(""); diff --git a/src/server.cpp b/src/server.cpp index 14f4d0832..bf39a96b8 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -3387,6 +3387,8 @@ void call(client *c, int flags) { * if C_ERR is returned the client was destroyed (i.e. after QUIT). */ int processCommand(client *c) { serverAssert(GlobalLocksAcquired()); + moduleCallCommandFilters(c); + /* The QUIT command is handled separately. Normal command procs will * go through checking for replication and QUIT will cause trouble * when FORCE_REPLICATION is enabled and would be implemented in diff --git a/src/server.h b/src/server.h index 26a9dac00..58db1d0ba 100644 --- a/src/server.h +++ b/src/server.h @@ -490,7 +490,8 @@ extern "C" { #define NOTIFY_EXPIRED (1<<8) /* x */ #define NOTIFY_EVICTED (1<<9) /* e */ #define NOTIFY_STREAM (1<<10) /* t */ -#define NOTIFY_ALL (NOTIFY_GENERIC | NOTIFY_STRING | NOTIFY_LIST | NOTIFY_SET | NOTIFY_HASH | NOTIFY_ZSET | NOTIFY_EXPIRED | NOTIFY_EVICTED | NOTIFY_STREAM) /* A flag */ +#define NOTIFY_KEY_MISS (1<<11) /* m */ +#define NOTIFY_ALL (NOTIFY_GENERIC | NOTIFY_STRING | NOTIFY_LIST | NOTIFY_SET | NOTIFY_HASH | NOTIFY_ZSET | NOTIFY_EXPIRED | NOTIFY_EVICTED | NOTIFY_STREAM | NOTIFY_KEY_MISS) /* A flag */ /* Get the first bind addr or NULL */ #define NET_FIRST_BIND_ADDR (server.bindaddr_count ? server.bindaddr[0] : NULL) @@ -1570,6 +1571,7 @@ size_t moduleCount(void); void moduleAcquireGIL(int fServerThread); void moduleReleaseGIL(int fServerThread); void moduleNotifyKeyspaceEvent(int type, const char *event, robj *key, int dbid); +void moduleCallCommandFilters(client *c); /* Utils */ long long ustime(void); diff --git a/tests/modules/commandfilter.tcl b/tests/modules/commandfilter.tcl new file mode 100644 index 000000000..1e5c41d2b --- /dev/null +++ b/tests/modules/commandfilter.tcl @@ -0,0 +1,84 @@ +set testmodule [file normalize src/modules/hellofilter.so] + +start_server {tags {"modules"}} { + r module load $testmodule log-key 0 + + test {Command Filter handles redirected commands} { + r set mykey @log + r lrange log-key 0 -1 + } "{set mykey @log}" + + test {Command Filter can call RedisModule_CommandFilterArgDelete} { + r rpush mylist elem1 @delme elem2 + r lrange mylist 0 -1 + } {elem1 elem2} + + test {Command Filter can call RedisModule_CommandFilterArgInsert} { + r del mylist + r rpush mylist elem1 @insertbefore elem2 @insertafter elem3 + r lrange mylist 0 -1 + } {elem1 --inserted-before-- @insertbefore elem2 @insertafter --inserted-after-- elem3} + + test {Command Filter can call RedisModule_CommandFilterArgReplace} { + r del mylist + r rpush mylist elem1 @replaceme elem2 + r lrange mylist 0 -1 + } {elem1 --replaced-- elem2} + + test {Command Filter applies on RM_Call() commands} { + r del log-key + r hellofilter.ping + r lrange log-key 0 -1 + } "{ping @log}" + + test {Command Filter applies on Lua redis.call()} { + r del log-key + r eval "redis.call('ping', '@log')" 0 + r lrange log-key 0 -1 + } "{ping @log}" + + test {Command Filter applies on Lua redis.call() that calls a module} { + r del log-key + r eval "redis.call('hellofilter.ping')" 0 + r lrange log-key 0 -1 + } "{ping @log}" + + test {Command Filter is unregistered implicitly on module unload} { + r del log-key + r module unload hellofilter + r set mykey @log + r lrange log-key 0 -1 + } {} + + r module load $testmodule log-key 0 + + test {Command Filter unregister works as expected} { + # Validate reloading succeeded + r del log-key + r set mykey @log + assert_equal "{set mykey @log}" [r lrange log-key 0 -1] + + # Unregister + r hellofilter.unregister + r del log-key + + r set mykey @log + r lrange log-key 0 -1 + } {} + + r module unload hellofilter + r module load $testmodule log-key 1 + + test {Command Filter REDISMODULE_CMDFILTER_NOSELF works as expected} { + r set mykey @log + assert_equal "{set mykey @log}" [r lrange log-key 0 -1] + + r del log-key + r hellofilter.ping + assert_equal {} [r lrange log-key 0 -1] + + r eval "redis.call('hellofilter.ping')" 0 + assert_equal {} [r lrange log-key 0 -1] + } + +} diff --git a/tests/test_helper.tcl b/tests/test_helper.tcl index 568eacdee..d2f281526 100644 --- a/tests/test_helper.tcl +++ b/tests/test_helper.tcl @@ -63,6 +63,7 @@ set ::all_tests { unit/lazyfree unit/wait unit/pendingquerybuf + modules/commandfilter } # Index to the next test to run in the ::all_tests list. set ::next_test 0