Merge github.com:antirez/redis into unstable

This commit is contained in:
vattezhang 2019-04-26 18:47:11 +08:00
commit dd038a522f
38 changed files with 1026 additions and 118 deletions

View File

@ -34,7 +34,21 @@ Redis Manifesto
so that the complexity is obvious and more complex operations can be so that the complexity is obvious and more complex operations can be
performed as the sum of the basic operations. performed as the sum of the basic operations.
4 - Code is like a poem; it's not just something we write to reach some 4 - We believe in code efficiency. Computers get faster and faster, yet we
believe that abusing computing capabilities is not wise: the amount of
operations you can do for a given amount of energy remains anyway a
significant parameter: it allows to do more with less computers and, at
the same time, having a smaller environmental impact. Similarly Redis is
able to "scale down" to smaller devices. It is perfectly usable in a
Raspberry Pi and other small ARM based computers. Faster code having
just the layers of abstractions that are really needed will also result,
often, in more predictable performances. We think likewise about memory
usage, one of the fundamental goals of the Redis project is to
incrementally build more and more memory efficient data structures, so that
problems that were not approachable in RAM in the past will be perfectly
fine to handle in the future.
5 - Code is like a poem; it's not just something we write to reach some
practical result. Sometimes people that are far from the Redis philosophy practical result. Sometimes people that are far from the Redis philosophy
suggest using other code written by other authors (frequently in other suggest using other code written by other authors (frequently in other
languages) in order to implement something Redis currently lacks. But to us languages) in order to implement something Redis currently lacks. But to us
@ -45,23 +59,48 @@ Redis Manifesto
when needed. At the same time, when writing the Redis story we're trying to when needed. At the same time, when writing the Redis story we're trying to
write smaller stories that will fit in to other code. write smaller stories that will fit in to other code.
5 - We're against complexity. We believe designing systems is a fight against 6 - We're against complexity. We believe designing systems is a fight against
complexity. We'll accept to fight the complexity when it's worthwhile but complexity. We'll accept to fight the complexity when it's worthwhile but
we'll try hard to recognize when a small feature is not worth 1000s of lines we'll try hard to recognize when a small feature is not worth 1000s of lines
of code. Most of the time the best way to fight complexity is by not of code. Most of the time the best way to fight complexity is by not
creating it at all. creating it at all. Complexity is also a form of lock-in: code that is
very hard to understand cannot be modified by users in an independent way
regardless of the license. One of the main Redis goals is to remain
understandable, enough for a single programmer to have a clear idea of how
it works in detail just reading the source code for a couple of weeks.
6 - Two levels of API. The Redis API has two levels: 1) a subset of the API fits 7 - Threading is not a silver bullet. Instead of making Redis threaded we
believe on the idea of an efficient (mostly) single threaded Redis core.
Multiple of such cores, that may run in the same computer or may run
in multiple computers, are abstracted away as a single big system by
higher order protocols and features: Redis Cluster and the upcoming
Redis Proxy are our main goals. A shared nothing approach is not just
much simpler (see the previous point in this document), is also optimal
in NUMA systems. In the specific case of Redis it allows for each instance
to have a more limited amount of data, making the Redis persist-by-fork
approach more sounding. In the future we may explore parallelism only for
I/O, which is the low hanging fruit: minimal complexity could provide an
improved single process experience.
8 - Two levels of API. The Redis API has two levels: 1) a subset of the API fits
naturally into a distributed version of Redis and 2) a more complex API that naturally into a distributed version of Redis and 2) a more complex API that
supports multi-key operations. Both are useful if used judiciously but supports multi-key operations. Both are useful if used judiciously but
there's no way to make the more complex multi-keys API distributed in an there's no way to make the more complex multi-keys API distributed in an
opaque way without violating our other principles. We don't want to provide opaque way without violating our other principles. We don't want to provide
the illusion of something that will work magically when actually it can't in the illusion of something that will work magically when actually it can't in
all cases. Instead we'll provide commands to quickly migrate keys from one all cases. Instead we'll provide commands to quickly migrate keys from one
instance to another to perform multi-key operations and expose the tradeoffs instance to another to perform multi-key operations and expose the
to the user. trade-offs to the user.
7 - We optimize for joy. We believe writing code is a lot of hard work, and the 9 - We optimize for joy. We believe writing code is a lot of hard work, and the
only way it can be worth is by enjoying it. When there is no longer joy in only way it can be worth is by enjoying it. When there is no longer joy in
writing code, the best thing to do is stop. To prevent this, we'll avoid writing code, the best thing to do is stop. To prevent this, we'll avoid
taking paths that will make Redis less of a joy to develop. taking paths that will make Redis less of a joy to develop.
10 - All the above points are put together in what we call opportunistic
programming: trying to get the most for the user with minimal increases
in complexity (hanging fruits). Solve 95% of the problem with 5% of the
code when it is acceptable. Avoid a fixed schedule but follow the flow of
user requests, inspiration, Redis internal readiness for certain features
(sometimes many past changes reach a critical point making a previously
complex feature very easy to obtain).

16
runtest-moduleapi Executable file
View File

@ -0,0 +1,16 @@
#!/bin/sh
TCL_VERSIONS="8.5 8.6"
TCLSH=""
for VERSION in $TCL_VERSIONS; do
TCL=`which tclsh$VERSION 2>/dev/null` && TCLSH=$TCL
done
if [ -z $TCLSH ]
then
echo "You need tcl 8.5 or newer in order to run the Redis test"
exit 1
fi
make -C tests/modules && \
$TCLSH tests/test_helper.tcl --single unit/moduleapi/commandfilter "${@}"

View File

@ -542,6 +542,8 @@ struct redisCommand *ACLLookupCommand(const char *name) {
* and command ID. */ * and command ID. */
void ACLResetSubcommandsForCommand(user *u, unsigned long id) { void ACLResetSubcommandsForCommand(user *u, unsigned long id) {
if (u->allowed_subcommands && u->allowed_subcommands[id]) { if (u->allowed_subcommands && u->allowed_subcommands[id]) {
for (int i = 0; u->allowed_subcommands[id][i]; i++)
sdsfree(u->allowed_subcommands[id][i]);
zfree(u->allowed_subcommands[id]); zfree(u->allowed_subcommands[id]);
u->allowed_subcommands[id] = NULL; u->allowed_subcommands[id] = NULL;
} }

View File

@ -1239,7 +1239,7 @@ int rewriteModuleObject(rio *r, robj *key, robj *o) {
RedisModuleIO io; RedisModuleIO io;
moduleValue *mv = o->ptr; moduleValue *mv = o->ptr;
moduleType *mt = mv->type; moduleType *mt = mv->type;
moduleInitIOContext(io,mt,r); moduleInitIOContext(io,mt,r,key);
mt->aof_rewrite(&io,key,mv->value); mt->aof_rewrite(&io,key,mv->value);
if (io.ctx) { if (io.ctx) {
moduleFreeContext(io.ctx); moduleFreeContext(io.ctx);

View File

@ -4776,7 +4776,7 @@ NULL
/* Generates a DUMP-format representation of the object 'o', adding it to the /* Generates a DUMP-format representation of the object 'o', adding it to the
* io stream pointed by 'rio'. This function can't fail. */ * io stream pointed by 'rio'. This function can't fail. */
void createDumpPayload(rio *payload, robj *o) { void createDumpPayload(rio *payload, robj *o, robj *key) {
unsigned char buf[2]; unsigned char buf[2];
uint64_t crc; uint64_t crc;
@ -4784,7 +4784,7 @@ void createDumpPayload(rio *payload, robj *o) {
* byte followed by the serialized object. This is understood by RESTORE. */ * byte followed by the serialized object. This is understood by RESTORE. */
rioInitWithBuffer(payload,sdsempty()); rioInitWithBuffer(payload,sdsempty());
serverAssert(rdbSaveObjectType(payload,o)); serverAssert(rdbSaveObjectType(payload,o));
serverAssert(rdbSaveObject(payload,o)); serverAssert(rdbSaveObject(payload,o,key));
/* Write the footer, this is how it looks like: /* Write the footer, this is how it looks like:
* ----------------+---------------------+---------------+ * ----------------+---------------------+---------------+
@ -4842,7 +4842,7 @@ void dumpCommand(client *c) {
} }
/* Create the DUMP encoded representation. */ /* Create the DUMP encoded representation. */
createDumpPayload(&payload,o); createDumpPayload(&payload,o,c->argv[1]);
/* Transfer to the client */ /* Transfer to the client */
dumpobj = createObject(OBJ_STRING,payload.io.buffer.ptr); dumpobj = createObject(OBJ_STRING,payload.io.buffer.ptr);
@ -4915,7 +4915,7 @@ void restoreCommand(client *c) {
rioInitWithBuffer(&payload,c->argv[3]->ptr); rioInitWithBuffer(&payload,c->argv[3]->ptr);
if (((type = rdbLoadObjectType(&payload)) == -1) || if (((type = rdbLoadObjectType(&payload)) == -1) ||
((obj = rdbLoadObject(type,&payload)) == NULL)) ((obj = rdbLoadObject(type,&payload,c->argv[1])) == NULL))
{ {
addReplyError(c,"Bad data format"); addReplyError(c,"Bad data format");
return; return;
@ -5203,7 +5203,7 @@ try_again:
/* Emit the payload argument, that is the serialized object using /* Emit the payload argument, that is the serialized object using
* the DUMP format. */ * the DUMP format. */
createDumpPayload(&payload,ov[j]); createDumpPayload(&payload,ov[j],kv[j]);
serverAssertWithInfo(c,NULL, serverAssertWithInfo(c,NULL,
rioWriteBulkString(&cmd,payload.io.buffer.ptr, rioWriteBulkString(&cmd,payload.io.buffer.ptr,
sdslen(payload.io.buffer.ptr))); sdslen(payload.io.buffer.ptr)));

View File

@ -1074,8 +1074,8 @@ void configSetCommand(client *c) {
int soft_seconds; int soft_seconds;
class = getClientTypeByName(v[j]); class = getClientTypeByName(v[j]);
hard = strtoll(v[j+1],NULL,10); hard = memtoll(v[j+1],NULL);
soft = strtoll(v[j+2],NULL,10); soft = memtoll(v[j+2],NULL);
soft_seconds = strtoll(v[j+3],NULL,10); soft_seconds = strtoll(v[j+3],NULL,10);
server.client_obuf_limits[class].hard_limit_bytes = hard; server.client_obuf_limits[class].hard_limit_bytes = hard;

View File

@ -83,6 +83,7 @@ robj *lookupKey(redisDb *db, robj *key, int flags) {
* 1. A key gets expired if it reached it's TTL. * 1. A key gets expired if it reached it's TTL.
* 2. The key last access time is updated. * 2. The key last access time is updated.
* 3. The global keys hits/misses stats are updated (reported in INFO). * 3. The global keys hits/misses stats are updated (reported in INFO).
* 4. If keyspace notifications are enabled, a "keymiss" notification is fired.
* *
* This API should not be used when we write to the key after obtaining * This API should not be used when we write to the key after obtaining
* the object linked to the key, but only for read only operations. * the object linked to the key, but only for read only operations.
@ -106,6 +107,7 @@ robj *lookupKeyReadWithFlags(redisDb *db, robj *key, int flags) {
* to return NULL ASAP. */ * to return NULL ASAP. */
if (server.masterhost == NULL) { if (server.masterhost == NULL) {
server.stat_keyspace_misses++; server.stat_keyspace_misses++;
notifyKeyspaceEvent(NOTIFY_KEY_MISS, "keymiss", key, db->id);
return NULL; return NULL;
} }
@ -127,12 +129,15 @@ robj *lookupKeyReadWithFlags(redisDb *db, robj *key, int flags) {
server.current_client->cmd->flags & CMD_READONLY) server.current_client->cmd->flags & CMD_READONLY)
{ {
server.stat_keyspace_misses++; server.stat_keyspace_misses++;
notifyKeyspaceEvent(NOTIFY_KEY_MISS, "keymiss", key, db->id);
return NULL; return NULL;
} }
} }
val = lookupKey(db,key,flags); val = lookupKey(db,key,flags);
if (val == NULL) if (val == NULL) {
server.stat_keyspace_misses++; server.stat_keyspace_misses++;
notifyKeyspaceEvent(NOTIFY_KEY_MISS, "keymiss", key, db->id);
}
else else
server.stat_keyspace_hits++; server.stat_keyspace_hits++;
return val; return val;

View File

@ -659,7 +659,7 @@ void georadiusGeneric(client *c, int flags) {
zsetConvertToZiplistIfNeeded(zobj,maxelelen); zsetConvertToZiplistIfNeeded(zobj,maxelelen);
setKey(c->db,storekey,zobj); setKey(c->db,storekey,zobj);
decrRefCount(zobj); decrRefCount(zobj);
notifyKeyspaceEvent(NOTIFY_LIST,"georadiusstore",storekey, notifyKeyspaceEvent(NOTIFY_ZSET,"georadiusstore",storekey,
c->db->id); c->db->id);
server.dirty += returned_items; server.dirty += returned_items;
} else if (dbDelete(c->db,storekey)) { } else if (dbDelete(c->db,storekey)) {

View File

@ -614,6 +614,7 @@ int hllSparseToDense(robj *o) {
} else { } else {
runlen = HLL_SPARSE_VAL_LEN(p); runlen = HLL_SPARSE_VAL_LEN(p);
regval = HLL_SPARSE_VAL_VALUE(p); regval = HLL_SPARSE_VAL_VALUE(p);
if ((runlen + idx) > HLL_REGISTERS) break; /* Overflow. */
while(runlen--) { while(runlen--) {
HLL_DENSE_SET_REGISTER(hdr->registers,idx,regval); HLL_DENSE_SET_REGISTER(hdr->registers,idx,regval);
idx++; idx++;
@ -1013,7 +1014,12 @@ uint64_t hllCount(struct hllhdr *hdr, int *invalid) {
double m = HLL_REGISTERS; double m = HLL_REGISTERS;
double E; double E;
int j; int j;
int reghisto[HLL_Q+2] = {0}; /* Note that reghisto size could be just HLL_Q+2, becuase HLL_Q+1 is
* the maximum frequency of the "000...1" sequence the hash function is
* able to return. However it is slow to check for sanity of the
* input: instead we history array at a safe size: overflows will
* just write data to wrong, but correctly allocated, places. */
int reghisto[64] = {0};
/* Compute register histogram */ /* Compute register histogram */
if (hdr->encoding == HLL_DENSE) { if (hdr->encoding == HLL_DENSE) {
@ -1088,6 +1094,7 @@ int hllMerge(uint8_t *max, robj *hll) {
} else { } else {
runlen = HLL_SPARSE_VAL_LEN(p); runlen = HLL_SPARSE_VAL_LEN(p);
regval = HLL_SPARSE_VAL_VALUE(p); regval = HLL_SPARSE_VAL_VALUE(p);
if ((runlen + i) > HLL_REGISTERS) break; /* Overflow. */
while(runlen--) { while(runlen--) {
if (regval > max[i]) max[i] = regval; if (regval > max[i]) max[i] = regval;
i++; i++;

View File

@ -47,9 +47,23 @@ struct RedisModule {
int ver; /* Module version. We use just progressive integers. */ int ver; /* Module version. We use just progressive integers. */
int apiver; /* Module API version as requested during initialization.*/ int apiver; /* Module API version as requested during initialization.*/
list *types; /* Module data types. */ list *types; /* Module data types. */
list *usedby; /* List of modules using APIs from this one. */
list *using; /* List of modules we use some APIs of. */
list *filters; /* List of filters the module has registered. */
int in_call; /* RM_Call() nesting level */
}; };
typedef struct RedisModule RedisModule; typedef struct RedisModule RedisModule;
/* This represents a shared API. Shared APIs will be used to populate
* the server.sharedapi dictionary, mapping names of APIs exported by
* modules for other modules to use, to their structure specifying the
* function pointer that can be called. */
struct RedisModuleSharedAPI {
void *func;
RedisModule *module;
};
typedef struct RedisModuleSharedAPI RedisModuleSharedAPI;
static dict *modules; /* Hash table of modules. SDS -> RedisModule ptr.*/ static dict *modules; /* Hash table of modules. SDS -> RedisModule ptr.*/
/* Entries in the context->amqueue array, representing objects to free /* Entries in the context->amqueue array, representing objects to free
@ -258,6 +272,25 @@ typedef struct RedisModuleDictIter {
raxIterator ri; raxIterator ri;
} RedisModuleDictIter; } RedisModuleDictIter;
typedef struct RedisModuleCommandFilterCtx {
RedisModuleString **argv;
int argc;
} RedisModuleCommandFilterCtx;
typedef void (*RedisModuleCommandFilterFunc) (RedisModuleCommandFilterCtx *filter);
typedef struct RedisModuleCommandFilter {
/* The module that registered the filter */
RedisModule *module;
/* Filter callback function */
RedisModuleCommandFilterFunc callback;
/* REDISMODULE_CMDFILTER_* flags */
int flags;
} RedisModuleCommandFilter;
/* Registered filters */
static list *moduleCommandFilters;
/* -------------------------------------------------------------------------- /* --------------------------------------------------------------------------
* Prototypes * Prototypes
* -------------------------------------------------------------------------- */ * -------------------------------------------------------------------------- */
@ -509,6 +542,22 @@ void RedisModuleCommandDispatcher(client *c) {
cp->func(&ctx,(void**)c->argv,c->argc); cp->func(&ctx,(void**)c->argv,c->argc);
moduleHandlePropagationAfterCommandCallback(&ctx); moduleHandlePropagationAfterCommandCallback(&ctx);
moduleFreeContext(&ctx); moduleFreeContext(&ctx);
/* In some cases processMultibulkBuffer uses sdsMakeRoomFor to
* expand the query buffer, and in order to avoid a big object copy
* the query buffer SDS may be used directly as the SDS string backing
* the client argument vectors: sometimes this will result in the SDS
* string having unused space at the end. Later if a module takes ownership
* of the RedisString, such space will be wasted forever. Inside the
* Redis core this is not a problem because tryObjectEncoding() is called
* before storing strings in the key space. Here we need to do it
* for the module. */
for (int i = 0; i < c->argc; i++) {
/* Only do the work if the module took ownership of the object:
* in that case the refcount is no longer 1. */
if (c->argv[i]->refcount > 1)
trimStringObjectIfNeeded(c->argv[i]);
}
} }
/* This function returns the list of keys, with the same interface as the /* This function returns the list of keys, with the same interface as the
@ -701,6 +750,10 @@ void RM_SetModuleAttribs(RedisModuleCtx *ctx, const char *name, int ver, int api
module->ver = ver; module->ver = ver;
module->apiver = apiver; module->apiver = apiver;
module->types = listCreate(); module->types = listCreate();
module->usedby = listCreate();
module->using = listCreate();
module->filters = listCreate();
module->in_call = 0;
ctx->module = module; ctx->module = module;
} }
@ -2694,12 +2747,6 @@ RedisModuleCallReply *RM_Call(RedisModuleCtx *ctx, const char *cmdname, const ch
RedisModuleCallReply *reply = NULL; RedisModuleCallReply *reply = NULL;
int replicate = 0; /* Replicate this command? */ int replicate = 0; /* Replicate this command? */
cmd = lookupCommandByCString((char*)cmdname);
if (!cmd) {
errno = EINVAL;
return NULL;
}
/* Create the client and dispatch the command. */ /* Create the client and dispatch the command. */
va_start(ap, fmt); va_start(ap, fmt);
c = createClient(-1); c = createClient(-1);
@ -2713,11 +2760,25 @@ RedisModuleCallReply *RM_Call(RedisModuleCtx *ctx, const char *cmdname, const ch
c->db = ctx->client->db; c->db = ctx->client->db;
c->argv = argv; c->argv = argv;
c->argc = argc; c->argc = argc;
c->cmd = c->lastcmd = cmd; if (ctx->module) ctx->module->in_call++;
/* We handle the above format error only when the client is setup so that /* We handle the above format error only when the client is setup so that
* we can free it normally. */ * we can free it normally. */
if (argv == NULL) goto cleanup; if (argv == NULL) goto cleanup;
/* Call command filters */
moduleCallCommandFilters(c);
/* Lookup command now, after filters had a chance to make modifications
* if necessary.
*/
cmd = lookupCommand(c->argv[0]->ptr);
if (!cmd) {
errno = EINVAL;
goto cleanup;
}
c->cmd = c->lastcmd = cmd;
/* Basic arity checks. */ /* Basic arity checks. */
if ((cmd->arity > 0 && cmd->arity != argc) || (argc < -cmd->arity)) { if ((cmd->arity > 0 && cmd->arity != argc) || (argc < -cmd->arity)) {
errno = EINVAL; errno = EINVAL;
@ -2767,6 +2828,7 @@ RedisModuleCallReply *RM_Call(RedisModuleCtx *ctx, const char *cmdname, const ch
autoMemoryAdd(ctx,REDISMODULE_AM_REPLY,reply); autoMemoryAdd(ctx,REDISMODULE_AM_REPLY,reply);
cleanup: cleanup:
if (ctx->module) ctx->module->in_call--;
freeClient(c); freeClient(c);
return reply; return reply;
} }
@ -3408,6 +3470,14 @@ RedisModuleCtx *RM_GetContextFromIO(RedisModuleIO *io) {
return io->ctx; return io->ctx;
} }
/* Returns a RedisModuleString with the name of the key currently saving or
* loading, when an IO data type callback is called. There is no guarantee
* that the key name is always available, so this may return NULL.
*/
const RedisModuleString *RM_GetKeyNameFromIO(RedisModuleIO *io) {
return io->key;
}
/* -------------------------------------------------------------------------- /* --------------------------------------------------------------------------
* Logging * Logging
* -------------------------------------------------------------------------- */ * -------------------------------------------------------------------------- */
@ -3429,6 +3499,8 @@ void RM_LogRaw(RedisModule *module, const char *levelstr, const char *fmt, va_li
else if (!strcasecmp(levelstr,"warning")) level = LL_WARNING; else if (!strcasecmp(levelstr,"warning")) level = LL_WARNING;
else level = LL_VERBOSE; /* Default. */ else level = LL_VERBOSE; /* Default. */
if (level < server.verbosity) return;
name_len = snprintf(msg, sizeof(msg),"<%s> ", module->name); name_len = snprintf(msg, sizeof(msg),"<%s> ", module->name);
vsnprintf(msg + name_len, sizeof(msg) - name_len, fmt, ap); vsnprintf(msg + name_len, sizeof(msg) - name_len, fmt, ap);
serverLogRaw(level,msg); serverLogRaw(level,msg);
@ -3675,14 +3747,7 @@ void moduleHandleBlockedClients(void) {
* replies to send to the client in a thread safe context. * replies to send to the client in a thread safe context.
* We need to glue such replies to the client output buffer and * We need to glue such replies to the client output buffer and
* free the temporary client we just used for the replies. */ * free the temporary client we just used for the replies. */
if (c) { if (c) AddReplyFromClient(c, bc->reply_client);
if (bc->reply_client->bufpos)
addReplyProto(c,bc->reply_client->buf,
bc->reply_client->bufpos);
if (listLength(bc->reply_client->reply))
listJoin(c->reply,bc->reply_client->reply);
c->reply_bytes += bc->reply_client->reply_bytes;
}
freeClient(bc->reply_client); freeClient(bc->reply_client);
if (c != NULL) { if (c != NULL) {
@ -4623,6 +4688,329 @@ void RM_GetRandomHexChars(char *dst, size_t len) {
getRandomHexChars(dst,len); getRandomHexChars(dst,len);
} }
/* --------------------------------------------------------------------------
* Modules API exporting / importing
* -------------------------------------------------------------------------- */
/* This function is called by a module in order to export some API with a
* given name. Other modules will be able to use this API by calling the
* symmetrical function RM_GetSharedAPI() and casting the return value to
* the right function pointer.
*
* The function will return REDISMODULE_OK if the name is not already taken,
* otherwise REDISMODULE_ERR will be returned and no operation will be
* performed.
*
* IMPORTANT: the apiname argument should be a string literal with static
* lifetime. The API relies on the fact that it will always be valid in
* the future. */
int RM_ExportSharedAPI(RedisModuleCtx *ctx, const char *apiname, void *func) {
RedisModuleSharedAPI *sapi = zmalloc(sizeof(*sapi));
sapi->module = ctx->module;
sapi->func = func;
if (dictAdd(server.sharedapi, (char*)apiname, sapi) != DICT_OK) {
zfree(sapi);
return REDISMODULE_ERR;
}
return REDISMODULE_OK;
}
/* Request an exported API pointer. The return value is just a void pointer
* that the caller of this function will be required to cast to the right
* function pointer, so this is a private contract between modules.
*
* If the requested API is not available then NULL is returned. Because
* modules can be loaded at different times with different order, this
* function calls should be put inside some module generic API registering
* step, that is called every time a module attempts to execute a
* command that requires external APIs: if some API cannot be resolved, the
* command should return an error.
*
* Here is an exmaple:
*
* int ... myCommandImplementation() {
* if (getExternalAPIs() == 0) {
* reply with an error here if we cannot have the APIs
* }
* // Use the API:
* myFunctionPointer(foo);
* }
*
* And the function registerAPI() is:
*
* int getExternalAPIs(void) {
* static int api_loaded = 0;
* if (api_loaded != 0) return 1; // APIs already resolved.
*
* myFunctionPointer = RedisModule_GetOtherModuleAPI("...");
* if (myFunctionPointer == NULL) return 0;
*
* return 1;
* }
*/
void *RM_GetSharedAPI(RedisModuleCtx *ctx, const char *apiname) {
dictEntry *de = dictFind(server.sharedapi, apiname);
if (de == NULL) return NULL;
RedisModuleSharedAPI *sapi = dictGetVal(de);
if (listSearchKey(sapi->module->usedby,ctx->module) == NULL) {
listAddNodeTail(sapi->module->usedby,ctx->module);
listAddNodeTail(ctx->module->using,sapi->module);
}
return sapi->func;
}
/* Remove all the APIs registered by the specified module. Usually you
* want this when the module is going to be unloaded. This function
* assumes that's caller responsibility to make sure the APIs are not
* used by other modules.
*
* The number of unregistered APIs is returned. */
int moduleUnregisterSharedAPI(RedisModule *module) {
int count = 0;
dictIterator *di = dictGetSafeIterator(server.sharedapi);
dictEntry *de;
while ((de = dictNext(di)) != NULL) {
const char *apiname = dictGetKey(de);
RedisModuleSharedAPI *sapi = dictGetVal(de);
if (sapi->module == module) {
dictDelete(server.sharedapi,apiname);
zfree(sapi);
count++;
}
}
dictReleaseIterator(di);
return count;
}
/* Remove the specified module as an user of APIs of ever other module.
* This is usually called when a module is unloaded.
*
* Returns the number of modules this module was using APIs from. */
int moduleUnregisterUsedAPI(RedisModule *module) {
listIter li;
listNode *ln;
int count = 0;
listRewind(module->using,&li);
while((ln = listNext(&li))) {
RedisModule *used = ln->value;
listNode *ln = listSearchKey(used->usedby,module);
if (ln) {
listDelNode(module->using,ln);
count++;
}
}
return count;
}
/* Unregister all filters registered by a module.
* This is called when a module is being unloaded.
*
* Returns the number of filters unregistered. */
int moduleUnregisterFilters(RedisModule *module) {
listIter li;
listNode *ln;
int count = 0;
listRewind(module->filters,&li);
while((ln = listNext(&li))) {
RedisModuleCommandFilter *filter = ln->value;
listNode *ln = listSearchKey(moduleCommandFilters,filter);
if (ln) {
listDelNode(moduleCommandFilters,ln);
count++;
}
zfree(filter);
}
return count;
}
/* --------------------------------------------------------------------------
* Module Command Filter API
* -------------------------------------------------------------------------- */
/* Register a new command filter function.
*
* Command filtering makes it possible for modules to extend Redis by plugging
* into the execution flow of all commands.
*
* A registered filter gets called before Redis executes *any* command. This
* includes both core Redis commands and commands registered by any module. The
* filter applies in all execution paths including:
*
* 1. Invocation by a client.
* 2. Invocation through `RedisModule_Call()` by any module.
* 3. Invocation through Lua 'redis.call()`.
* 4. Replication of a command from a master.
*
* The filter executes in a special filter context, which is different and more
* limited than a RedisModuleCtx. Because the filter affects any command, it
* must be implemented in a very efficient way to reduce the performance impact
* on Redis. All Redis Module API calls that require a valid context (such as
* `RedisModule_Call()`, `RedisModule_OpenKey()`, etc.) are not supported in a
* filter context.
*
* The `RedisModuleCommandFilterCtx` can be used to inspect or modify the
* executed command and its arguments. As the filter executes before Redis
* begins processing the command, any change will affect the way the command is
* processed. For example, a module can override Redis commands this way:
*
* 1. Register a `MODULE.SET` command which implements an extended version of
* the Redis `SET` command.
* 2. Register a command filter which detects invocation of `SET` on a specific
* pattern of keys. Once detected, the filter will replace the first
* argument from `SET` to `MODULE.SET`.
* 3. When filter execution is complete, Redis considers the new command name
* and therefore executes the module's own command.
*
* Note that in the above use case, if `MODULE.SET` itself uses
* `RedisModule_Call()` the filter will be applied on that call as well. If
* that is not desired, the `REDISMODULE_CMDFILTER_NOSELF` flag can be set when
* registering the filter.
*
* The `REDISMODULE_CMDFILTER_NOSELF` flag prevents execution flows that
* originate from the module's own `RM_Call()` from reaching the filter. This
* flag is effective for all execution flows, including nested ones, as long as
* the execution begins from the module's command context or a thread-safe
* context that is associated with a blocking command.
*
* Detached thread-safe contexts are *not* associated with the module and cannot
* be protected by this flag.
*
* If multiple filters are registered (by the same or different modules), they
* are executed in the order of registration.
*/
RedisModuleCommandFilter *RM_RegisterCommandFilter(RedisModuleCtx *ctx, RedisModuleCommandFilterFunc callback, int flags) {
RedisModuleCommandFilter *filter = zmalloc(sizeof(*filter));
filter->module = ctx->module;
filter->callback = callback;
filter->flags = flags;
listAddNodeTail(moduleCommandFilters, filter);
listAddNodeTail(ctx->module->filters, filter);
return filter;
}
/* Unregister a command filter.
*/
int RM_UnregisterCommandFilter(RedisModuleCtx *ctx, RedisModuleCommandFilter *filter) {
listNode *ln;
/* A module can only remove its own filters */
if (filter->module != ctx->module) return REDISMODULE_ERR;
ln = listSearchKey(moduleCommandFilters,filter);
if (!ln) return REDISMODULE_ERR;
listDelNode(moduleCommandFilters,ln);
ln = listSearchKey(ctx->module->filters,filter);
if (!ln) return REDISMODULE_ERR; /* Shouldn't happen */
listDelNode(ctx->module->filters,ln);
return REDISMODULE_OK;
}
void moduleCallCommandFilters(client *c) {
if (listLength(moduleCommandFilters) == 0) return;
listIter li;
listNode *ln;
listRewind(moduleCommandFilters,&li);
RedisModuleCommandFilterCtx filter = {
.argv = c->argv,
.argc = c->argc
};
while((ln = listNext(&li))) {
RedisModuleCommandFilter *f = ln->value;
/* Skip filter if REDISMODULE_CMDFILTER_NOSELF is set and module is
* currently processing a command.
*/
if ((f->flags & REDISMODULE_CMDFILTER_NOSELF) && f->module->in_call) continue;
/* Call filter */
f->callback(&filter);
}
c->argv = filter.argv;
c->argc = filter.argc;
}
/* Return the number of arguments a filtered command has. The number of
* arguments include the command itself.
*/
int RM_CommandFilterArgsCount(RedisModuleCommandFilterCtx *fctx)
{
return fctx->argc;
}
/* Return the specified command argument. The first argument (position 0) is
* the command itself, and the rest are user-provided args.
*/
const RedisModuleString *RM_CommandFilterArgGet(RedisModuleCommandFilterCtx *fctx, int pos)
{
if (pos < 0 || pos >= fctx->argc) return NULL;
return fctx->argv[pos];
}
/* Modify the filtered command by inserting a new argument at the specified
* position. The specified RedisModuleString argument may be used by Redis
* after the filter context is destroyed, so it must not be auto-memory
* allocated, freed or used elsewhere.
*/
int RM_CommandFilterArgInsert(RedisModuleCommandFilterCtx *fctx, int pos, RedisModuleString *arg)
{
int i;
if (pos < 0 || pos > fctx->argc) return REDISMODULE_ERR;
fctx->argv = zrealloc(fctx->argv, (fctx->argc+1)*sizeof(RedisModuleString *));
for (i = fctx->argc; i > pos; i--) {
fctx->argv[i] = fctx->argv[i-1];
}
fctx->argv[pos] = arg;
fctx->argc++;
return REDISMODULE_OK;
}
/* Modify the filtered command by replacing an existing argument with a new one.
* The specified RedisModuleString argument may be used by Redis after the
* filter context is destroyed, so it must not be auto-memory allocated, freed
* or used elsewhere.
*/
int RM_CommandFilterArgReplace(RedisModuleCommandFilterCtx *fctx, int pos, RedisModuleString *arg)
{
if (pos < 0 || pos >= fctx->argc) return REDISMODULE_ERR;
decrRefCount(fctx->argv[pos]);
fctx->argv[pos] = arg;
return REDISMODULE_OK;
}
/* Modify the filtered command by deleting an argument at the specified
* position.
*/
int RM_CommandFilterArgDelete(RedisModuleCommandFilterCtx *fctx, int pos)
{
int i;
if (pos < 0 || pos >= fctx->argc) return REDISMODULE_ERR;
decrRefCount(fctx->argv[pos]);
for (i = pos; i < fctx->argc-1; i++) {
fctx->argv[i] = fctx->argv[i+1];
}
fctx->argc--;
return REDISMODULE_OK;
}
/* -------------------------------------------------------------------------- /* --------------------------------------------------------------------------
* Modules API internals * Modules API internals
* -------------------------------------------------------------------------- */ * -------------------------------------------------------------------------- */
@ -4669,6 +5057,9 @@ void moduleInitModulesSystem(void) {
moduleFreeContextReusedClient->flags |= CLIENT_MODULE; moduleFreeContextReusedClient->flags |= CLIENT_MODULE;
moduleFreeContextReusedClient->user = NULL; /* root user. */ moduleFreeContextReusedClient->user = NULL; /* root user. */
/* Set up filter list */
moduleCommandFilters = listCreate();
moduleRegisterCoreAPI(); moduleRegisterCoreAPI();
if (pipe(server.module_blocked_pipe) == -1) { if (pipe(server.module_blocked_pipe) == -1) {
serverLog(LL_WARNING, serverLog(LL_WARNING,
@ -4718,6 +5109,7 @@ void moduleLoadFromQueue(void) {
void moduleFreeModuleStructure(struct RedisModule *module) { void moduleFreeModuleStructure(struct RedisModule *module) {
listRelease(module->types); listRelease(module->types);
listRelease(module->filters);
sdsfree(module->name); sdsfree(module->name);
zfree(module); zfree(module);
} }
@ -4767,6 +5159,8 @@ int moduleLoad(const char *path, void **module_argv, int module_argc) {
if (onload((void*)&ctx,module_argv,module_argc) == REDISMODULE_ERR) { if (onload((void*)&ctx,module_argv,module_argc) == REDISMODULE_ERR) {
if (ctx.module) { if (ctx.module) {
moduleUnregisterCommands(ctx.module); moduleUnregisterCommands(ctx.module);
moduleUnregisterSharedAPI(ctx.module);
moduleUnregisterUsedAPI(ctx.module);
moduleFreeModuleStructure(ctx.module); moduleFreeModuleStructure(ctx.module);
} }
dlclose(handle); dlclose(handle);
@ -4796,14 +5190,18 @@ int moduleUnload(sds name) {
if (module == NULL) { if (module == NULL) {
errno = ENOENT; errno = ENOENT;
return REDISMODULE_ERR; return REDISMODULE_ERR;
} } else if (listLength(module->types)) {
if (listLength(module->types)) {
errno = EBUSY; errno = EBUSY;
return REDISMODULE_ERR; return REDISMODULE_ERR;
} else if (listLength(module->usedby)) {
errno = EPERM;
return REDISMODULE_ERR;
} }
moduleUnregisterCommands(module); moduleUnregisterCommands(module);
moduleUnregisterSharedAPI(module);
moduleUnregisterUsedAPI(module);
moduleUnregisterFilters(module);
/* Remove any notification subscribers this module might have */ /* Remove any notification subscribers this module might have */
moduleUnsubscribeNotifications(module); moduleUnsubscribeNotifications(module);
@ -4884,7 +5282,12 @@ NULL
errmsg = "no such module with that name"; errmsg = "no such module with that name";
break; break;
case EBUSY: case EBUSY:
errmsg = "the module exports one or more module-side data types, can't unload"; errmsg = "the module exports one or more module-side data "
"types, can't unload";
break;
case EPERM:
errmsg = "the module exports APIs used by other modules. "
"Please unload them first and try again";
break; break;
default: default:
errmsg = "operation not possible."; errmsg = "operation not possible.";
@ -4909,6 +5312,7 @@ size_t moduleCount(void) {
* file so that's easy to seek it to add new entries. */ * file so that's easy to seek it to add new entries. */
void moduleRegisterCoreAPI(void) { void moduleRegisterCoreAPI(void) {
server.moduleapi = dictCreate(&moduleAPIDictType,NULL); server.moduleapi = dictCreate(&moduleAPIDictType,NULL);
server.sharedapi = dictCreate(&moduleAPIDictType,NULL);
REGISTER_API(Alloc); REGISTER_API(Alloc);
REGISTER_API(Calloc); REGISTER_API(Calloc);
REGISTER_API(Realloc); REGISTER_API(Realloc);
@ -5006,6 +5410,7 @@ void moduleRegisterCoreAPI(void) {
REGISTER_API(RetainString); REGISTER_API(RetainString);
REGISTER_API(StringCompare); REGISTER_API(StringCompare);
REGISTER_API(GetContextFromIO); REGISTER_API(GetContextFromIO);
REGISTER_API(GetKeyNameFromIO);
REGISTER_API(BlockClient); REGISTER_API(BlockClient);
REGISTER_API(UnblockClient); REGISTER_API(UnblockClient);
REGISTER_API(IsBlockedReplyRequest); REGISTER_API(IsBlockedReplyRequest);
@ -5059,4 +5464,13 @@ void moduleRegisterCoreAPI(void) {
REGISTER_API(DictPrev); REGISTER_API(DictPrev);
REGISTER_API(DictCompareC); REGISTER_API(DictCompareC);
REGISTER_API(DictCompare); REGISTER_API(DictCompare);
REGISTER_API(ExportSharedAPI);
REGISTER_API(GetSharedAPI);
REGISTER_API(RegisterCommandFilter);
REGISTER_API(UnregisterCommandFilter);
REGISTER_API(CommandFilterArgsCount);
REGISTER_API(CommandFilterArgGet);
REGISTER_API(CommandFilterArgInsert);
REGISTER_API(CommandFilterArgReplace);
REGISTER_API(CommandFilterArgDelete);
} }

View File

@ -46,7 +46,6 @@ hellotimer.so: hellotimer.xo
hellodict.xo: ../redismodule.h hellodict.xo: ../redismodule.h
hellodict.so: hellodict.xo hellodict.so: hellodict.xo
$(LD) -o $@ $< $(SHOBJ_LDFLAGS) $(LIBS) -lc
testmodule.xo: ../redismodule.h testmodule.xo: ../redismodule.h

View File

@ -188,6 +188,10 @@ int TestNotifications(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
RedisModule_Call(ctx, "LPUSH", "cc", "l", "y"); RedisModule_Call(ctx, "LPUSH", "cc", "l", "y");
RedisModule_Call(ctx, "LPUSH", "cc", "l", "y"); RedisModule_Call(ctx, "LPUSH", "cc", "l", "y");
/* Miss some keys intentionally so we will get a "keymiss" notification. */
RedisModule_Call(ctx, "GET", "c", "nosuchkey");
RedisModule_Call(ctx, "SMEMBERS", "c", "nosuchkey");
size_t sz; size_t sz;
const char *rep; const char *rep;
RedisModuleCallReply *r = RedisModule_Call(ctx, "HGET", "cc", "notifications", "foo"); RedisModuleCallReply *r = RedisModule_Call(ctx, "HGET", "cc", "notifications", "foo");
@ -225,6 +229,16 @@ int TestNotifications(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
FAIL("Wrong reply for l"); FAIL("Wrong reply for l");
} }
r = RedisModule_Call(ctx, "HGET", "cc", "notifications", "nosuchkey");
if (r == NULL || RedisModule_CallReplyType(r) != REDISMODULE_REPLY_STRING) {
FAIL("Wrong or no reply for nosuchkey");
} else {
rep = RedisModule_CallReplyStringPtr(r, &sz);
if (sz != 1 || *rep != '2') {
FAIL("Got reply '%.*s'. expected '2'", sz, rep);
}
}
RedisModule_Call(ctx, "FLUSHDB", ""); RedisModule_Call(ctx, "FLUSHDB", "");
return RedisModule_ReplyWithSimpleString(ctx, "OK"); return RedisModule_ReplyWithSimpleString(ctx, "OK");
@ -435,7 +449,8 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
RedisModule_SubscribeToKeyspaceEvents(ctx, RedisModule_SubscribeToKeyspaceEvents(ctx,
REDISMODULE_NOTIFY_HASH | REDISMODULE_NOTIFY_HASH |
REDISMODULE_NOTIFY_SET | REDISMODULE_NOTIFY_SET |
REDISMODULE_NOTIFY_STRING, REDISMODULE_NOTIFY_STRING |
REDISMODULE_NOTIFY_KEY_MISS,
NotifyCallback); NotifyCallback);
if (RedisModule_CreateCommand(ctx,"test.notify", if (RedisModule_CreateCommand(ctx,"test.notify",
TestNotifications,"write deny-oom",1,1,1) == REDISMODULE_ERR) TestNotifications,"write deny-oom",1,1,1) == REDISMODULE_ERR)

View File

@ -744,6 +744,19 @@ void addReplySubcommandSyntaxError(client *c) {
sdsfree(cmd); sdsfree(cmd);
} }
/* Append 'src' client output buffers into 'dst' client output buffers.
* This function clears the output buffers of 'src' */
void AddReplyFromClient(client *dst, client *src) {
if (prepareClientToWrite(dst) != C_OK)
return;
addReplyProto(dst,src->buf, src->bufpos);
if (listLength(src->reply))
listJoin(dst->reply,src->reply);
dst->reply_bytes += src->reply_bytes;
src->reply_bytes = 0;
src->bufpos = 0;
}
/* Copy 'src' client output buffers into 'dst' client output buffers. /* Copy 'src' client output buffers into 'dst' client output buffers.
* The function takes care of freeing the old output buffers of the * The function takes care of freeing the old output buffers of the
* destination client. */ * destination client. */
@ -911,6 +924,16 @@ void unlinkClient(client *c) {
c->client_list_node = NULL; c->client_list_node = NULL;
} }
/* In the case of diskless replication the fork is writing to the
* sockets and just closing the fd isn't enough, if we don't also
* shutdown the socket the fork will continue to write to the slave
* and the salve will only find out that it was disconnected when
* it will finish reading the rdb. */
if ((c->flags & CLIENT_SLAVE) &&
(c->replstate == SLAVE_STATE_WAIT_BGSAVE_END)) {
shutdown(c->fd, SHUT_RDWR);
}
/* Unregister async I/O handlers and close the socket. */ /* Unregister async I/O handlers and close the socket. */
aeDeleteFileEvent(server.el,c->fd,AE_READABLE); aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE); aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);

View File

@ -55,6 +55,7 @@ int keyspaceEventsStringToFlags(char *classes) {
case 'K': flags |= NOTIFY_KEYSPACE; break; case 'K': flags |= NOTIFY_KEYSPACE; break;
case 'E': flags |= NOTIFY_KEYEVENT; break; case 'E': flags |= NOTIFY_KEYEVENT; break;
case 't': flags |= NOTIFY_STREAM; break; case 't': flags |= NOTIFY_STREAM; break;
case 'm': flags |= NOTIFY_KEY_MISS; break;
default: return -1; default: return -1;
} }
} }
@ -81,6 +82,7 @@ sds keyspaceEventsFlagsToString(int flags) {
if (flags & NOTIFY_EXPIRED) res = sdscatlen(res,"x",1); if (flags & NOTIFY_EXPIRED) res = sdscatlen(res,"x",1);
if (flags & NOTIFY_EVICTED) res = sdscatlen(res,"e",1); if (flags & NOTIFY_EVICTED) res = sdscatlen(res,"e",1);
if (flags & NOTIFY_STREAM) res = sdscatlen(res,"t",1); if (flags & NOTIFY_STREAM) res = sdscatlen(res,"t",1);
if (flags & NOTIFY_KEY_MISS) res = sdscatlen(res,"m",1);
} }
if (flags & NOTIFY_KEYSPACE) res = sdscatlen(res,"K",1); if (flags & NOTIFY_KEYSPACE) res = sdscatlen(res,"K",1);
if (flags & NOTIFY_KEYEVENT) res = sdscatlen(res,"E",1); if (flags & NOTIFY_KEYEVENT) res = sdscatlen(res,"E",1);

View File

@ -415,6 +415,18 @@ int isObjectRepresentableAsLongLong(robj *o, long long *llval) {
} }
} }
/* Optimize the SDS string inside the string object to require little space,
* in case there is more than 10% of free space at the end of the SDS
* string. This happens because SDS strings tend to overallocate to avoid
* wasting too much time in allocations when appending to the string. */
void trimStringObjectIfNeeded(robj *o) {
if (o->encoding == OBJ_ENCODING_RAW &&
sdsavail(o->ptr) > sdslen(o->ptr)/10)
{
o->ptr = sdsRemoveFreeSpace(o->ptr);
}
}
/* Try to encode a string object in order to save space */ /* Try to encode a string object in order to save space */
robj *tryObjectEncoding(robj *o) { robj *tryObjectEncoding(robj *o) {
long value; long value;
@ -484,11 +496,7 @@ robj *tryObjectEncoding(robj *o) {
* We do that only for relatively large strings as this branch * We do that only for relatively large strings as this branch
* is only entered if the length of the string is greater than * is only entered if the length of the string is greater than
* OBJ_ENCODING_EMBSTR_SIZE_LIMIT. */ * OBJ_ENCODING_EMBSTR_SIZE_LIMIT. */
if (o->encoding == OBJ_ENCODING_RAW && trimStringObjectIfNeeded(o);
sdsavail(s) > len/10)
{
o->ptr = sdsRemoveFreeSpace(o->ptr);
}
/* Return the original object. */ /* Return the original object. */
return o; return o;
@ -1202,16 +1210,20 @@ void objectSetLRUOrLFU(robj *val, long long lfu_freq, long long lru_idle,
val->lru = (LFUGetTimeInMinutes()<<8) | lfu_freq; val->lru = (LFUGetTimeInMinutes()<<8) | lfu_freq;
} }
} else if (lru_idle >= 0) { } else if (lru_idle >= 0) {
/* Serialized LRU idle time is in seconds. Scale /* Provided LRU idle time is in seconds. Scale
* according to the LRU clock resolution this Redis * according to the LRU clock resolution this Redis
* instance was compiled with (normally 1000 ms, so the * instance was compiled with (normally 1000 ms, so the
* below statement will expand to lru_idle*1000/1000. */ * below statement will expand to lru_idle*1000/1000. */
lru_idle = lru_idle*1000/LRU_CLOCK_RESOLUTION; lru_idle = lru_idle*1000/LRU_CLOCK_RESOLUTION;
val->lru = lru_clock - lru_idle; long lru_abs = lru_clock - lru_idle; /* Absolute access time. */
/* If the lru field overflows (since LRU it is a wrapping /* If the LRU field underflows (since LRU it is a wrapping
* clock), the best we can do is to provide the maximum * clock), the best we can do is to provide a large enough LRU
* representable idle time. */ * that is half-way in the circlular LRU clock we use: this way
if (val->lru < 0) val->lru = lru_clock+1; * the computed idle time for this object will stay high for quite
* some time. */
if (lru_abs < 0)
lru_abs = (lru_clock+(LRU_CLOCK_MAX/2)) % LRU_CLOCK_MAX;
val->lru = lru_abs;
} }
} }

View File

@ -751,7 +751,7 @@ size_t rdbSaveStreamConsumers(rio *rdb, streamCG *cg) {
/* Save a Redis object. /* Save a Redis object.
* Returns -1 on error, number of bytes written on success. */ * Returns -1 on error, number of bytes written on success. */
ssize_t rdbSaveObject(rio *rdb, robj *o) { ssize_t rdbSaveObject(rio *rdb, robj *o, robj *key) {
ssize_t n = 0, nwritten = 0; ssize_t n = 0, nwritten = 0;
if (o->type == OBJ_STRING) { if (o->type == OBJ_STRING) {
@ -966,7 +966,7 @@ ssize_t rdbSaveObject(rio *rdb, robj *o) {
RedisModuleIO io; RedisModuleIO io;
moduleValue *mv = o->ptr; moduleValue *mv = o->ptr;
moduleType *mt = mv->type; moduleType *mt = mv->type;
moduleInitIOContext(io,mt,rdb); moduleInitIOContext(io,mt,rdb,key);
/* Write the "module" identifier as prefix, so that we'll be able /* Write the "module" identifier as prefix, so that we'll be able
* to call the right module during loading. */ * to call the right module during loading. */
@ -996,7 +996,7 @@ ssize_t rdbSaveObject(rio *rdb, robj *o) {
* this length with very little changes to the code. In the future * this length with very little changes to the code. In the future
* we could switch to a faster solution. */ * we could switch to a faster solution. */
size_t rdbSavedObjectLen(robj *o) { size_t rdbSavedObjectLen(robj *o) {
ssize_t len = rdbSaveObject(NULL,o); ssize_t len = rdbSaveObject(NULL,o,NULL);
serverAssertWithInfo(NULL,o,len != -1); serverAssertWithInfo(NULL,o,len != -1);
return len; return len;
} }
@ -1038,7 +1038,7 @@ int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, long long expiretime) {
/* Save type, key, value */ /* Save type, key, value */
if (rdbSaveObjectType(rdb,val) == -1) return -1; if (rdbSaveObjectType(rdb,val) == -1) return -1;
if (rdbSaveStringObject(rdb,key) == -1) return -1; if (rdbSaveStringObject(rdb,key) == -1) return -1;
if (rdbSaveObject(rdb,val) == -1) return -1; if (rdbSaveObject(rdb,val,key) == -1) return -1;
return 1; return 1;
} }
@ -1380,7 +1380,7 @@ robj *rdbLoadCheckModuleValue(rio *rdb, char *modulename) {
/* Load a Redis object of the specified type from the specified file. /* Load a Redis object of the specified type from the specified file.
* On success a newly allocated object is returned, otherwise NULL. */ * On success a newly allocated object is returned, otherwise NULL. */
robj *rdbLoadObject(int rdbtype, rio *rdb) { robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key) {
robj *o = NULL, *ele, *dec; robj *o = NULL, *ele, *dec;
uint64_t len; uint64_t len;
unsigned int i; unsigned int i;
@ -1767,7 +1767,7 @@ robj *rdbLoadObject(int rdbtype, rio *rdb) {
exit(1); exit(1);
} }
RedisModuleIO io; RedisModuleIO io;
moduleInitIOContext(io,mt,rdb); moduleInitIOContext(io,mt,rdb,key);
io.ver = (rdbtype == RDB_TYPE_MODULE) ? 1 : 2; io.ver = (rdbtype == RDB_TYPE_MODULE) ? 1 : 2;
/* Call the rdb_load method of the module providing the 10 bit /* Call the rdb_load method of the module providing the 10 bit
* encoding version in the lower 10 bits of the module ID. */ * encoding version in the lower 10 bits of the module ID. */
@ -2023,7 +2023,7 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) {
/* Read key */ /* Read key */
if ((key = rdbLoadStringObject(rdb)) == NULL) goto eoferr; if ((key = rdbLoadStringObject(rdb)) == NULL) goto eoferr;
/* Read value */ /* Read value */
if ((val = rdbLoadObject(type,rdb)) == NULL) goto eoferr; if ((val = rdbLoadObject(type,rdb,key)) == NULL) goto eoferr;
/* Check if the key already expired. This function is used when loading /* Check if the key already expired. This function is used when loading
* an RDB file from disk, either at startup, or when an RDB was * an RDB file from disk, either at startup, or when an RDB was
* received from the master. In the latter case, the master is * received from the master. In the latter case, the master is

View File

@ -140,9 +140,9 @@ int rdbSaveBackground(char *filename, rdbSaveInfo *rsi);
int rdbSaveToSlavesSockets(rdbSaveInfo *rsi); int rdbSaveToSlavesSockets(rdbSaveInfo *rsi);
void rdbRemoveTempFile(pid_t childpid); void rdbRemoveTempFile(pid_t childpid);
int rdbSave(char *filename, rdbSaveInfo *rsi); int rdbSave(char *filename, rdbSaveInfo *rsi);
ssize_t rdbSaveObject(rio *rdb, robj *o); ssize_t rdbSaveObject(rio *rdb, robj *o, robj *key);
size_t rdbSavedObjectLen(robj *o); size_t rdbSavedObjectLen(robj *o);
robj *rdbLoadObject(int type, rio *rdb); robj *rdbLoadObject(int type, rio *rdb, robj *key);
void backgroundSaveDoneHandler(int exitcode, int bysignal); void backgroundSaveDoneHandler(int exitcode, int bysignal);
int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, long long expiretime); int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, long long expiretime);
robj *rdbLoadStringObject(rio *rdb); robj *rdbLoadStringObject(rio *rdb);

View File

@ -1197,7 +1197,7 @@ static int fetchClusterSlotsConfiguration(client c) {
assert(reply->type == REDIS_REPLY_ARRAY); assert(reply->type == REDIS_REPLY_ARRAY);
for (i = 0; i < reply->elements; i++) { for (i = 0; i < reply->elements; i++) {
redisReply *r = reply->element[i]; redisReply *r = reply->element[i];
assert(r->type = REDIS_REPLY_ARRAY); assert(r->type == REDIS_REPLY_ARRAY);
assert(r->elements >= 3); assert(r->elements >= 3);
int from, to, slot; int from, to, slot;
from = r->element[0]->integer; from = r->element[0]->integer;

View File

@ -33,8 +33,8 @@
#define ERROR(...) { \ #define ERROR(...) { \
char __buf[1024]; \ char __buf[1024]; \
sprintf(__buf, __VA_ARGS__); \ snprintf(__buf, sizeof(__buf), __VA_ARGS__); \
sprintf(error, "0x%16llx: %s", (long long)epos, __buf); \ snprintf(error, sizeof(error), "0x%16llx: %s", (long long)epos, __buf); \
} }
static char error[1024]; static char error[1024];

View File

@ -285,7 +285,7 @@ int redis_check_rdb(char *rdbfilename, FILE *fp) {
rdbstate.keys++; rdbstate.keys++;
/* Read value */ /* Read value */
rdbstate.doing = RDB_CHECK_DOING_READ_OBJECT_VALUE; rdbstate.doing = RDB_CHECK_DOING_READ_OBJECT_VALUE;
if ((val = rdbLoadObject(type,&rdb)) == NULL) goto eoferr; if ((val = rdbLoadObject(type,&rdb,key)) == NULL) goto eoferr;
/* Check if the key already expired. */ /* Check if the key already expired. */
if (expiretime != -1 && expiretime < now) if (expiretime != -1 && expiretime < now)
rdbstate.already_expired++; rdbstate.already_expired++;

View File

@ -98,7 +98,8 @@
#define REDISMODULE_NOTIFY_EXPIRED (1<<8) /* x */ #define REDISMODULE_NOTIFY_EXPIRED (1<<8) /* x */
#define REDISMODULE_NOTIFY_EVICTED (1<<9) /* e */ #define REDISMODULE_NOTIFY_EVICTED (1<<9) /* e */
#define REDISMODULE_NOTIFY_STREAM (1<<10) /* t */ #define REDISMODULE_NOTIFY_STREAM (1<<10) /* t */
#define REDISMODULE_NOTIFY_ALL (REDISMODULE_NOTIFY_GENERIC | REDISMODULE_NOTIFY_STRING | REDISMODULE_NOTIFY_LIST | REDISMODULE_NOTIFY_SET | REDISMODULE_NOTIFY_HASH | REDISMODULE_NOTIFY_ZSET | REDISMODULE_NOTIFY_EXPIRED | REDISMODULE_NOTIFY_EVICTED | REDISMODULE_NOTIFY_STREAM) /* A */ #define REDISMODULE_NOTIFY_KEY_MISS (1<<11) /* m */
#define REDISMODULE_NOTIFY_ALL (REDISMODULE_NOTIFY_GENERIC | REDISMODULE_NOTIFY_STRING | REDISMODULE_NOTIFY_LIST | REDISMODULE_NOTIFY_SET | REDISMODULE_NOTIFY_HASH | REDISMODULE_NOTIFY_ZSET | REDISMODULE_NOTIFY_EXPIRED | REDISMODULE_NOTIFY_EVICTED | REDISMODULE_NOTIFY_STREAM | REDISMODULE_NOTIFY_KEY_MISS) /* A */
/* A special pointer that we can use between the core and the module to signal /* A special pointer that we can use between the core and the module to signal
@ -132,6 +133,11 @@
* of timers that are going to expire, sorted by expire time. */ * of timers that are going to expire, sorted by expire time. */
typedef uint64_t RedisModuleTimerID; typedef uint64_t RedisModuleTimerID;
/* CommandFilter Flags */
/* Do filter RedisModule_Call() commands initiated by module itself. */
#define REDISMODULE_CMDFILTER_NOSELF (1<<0)
/* ------------------------- End of common defines ------------------------ */ /* ------------------------- End of common defines ------------------------ */
#ifndef REDISMODULE_CORE #ifndef REDISMODULE_CORE
@ -150,6 +156,8 @@ typedef struct RedisModuleBlockedClient RedisModuleBlockedClient;
typedef struct RedisModuleClusterInfo RedisModuleClusterInfo; typedef struct RedisModuleClusterInfo RedisModuleClusterInfo;
typedef struct RedisModuleDict RedisModuleDict; typedef struct RedisModuleDict RedisModuleDict;
typedef struct RedisModuleDictIter RedisModuleDictIter; typedef struct RedisModuleDictIter RedisModuleDictIter;
typedef struct RedisModuleCommandFilterCtx RedisModuleCommandFilterCtx;
typedef struct RedisModuleCommandFilter RedisModuleCommandFilter;
typedef int (*RedisModuleCmdFunc)(RedisModuleCtx *ctx, RedisModuleString **argv, int argc); typedef int (*RedisModuleCmdFunc)(RedisModuleCtx *ctx, RedisModuleString **argv, int argc);
typedef void (*RedisModuleDisconnectFunc)(RedisModuleCtx *ctx, RedisModuleBlockedClient *bc); typedef void (*RedisModuleDisconnectFunc)(RedisModuleCtx *ctx, RedisModuleBlockedClient *bc);
@ -162,6 +170,7 @@ typedef void (*RedisModuleTypeDigestFunc)(RedisModuleDigest *digest, void *value
typedef void (*RedisModuleTypeFreeFunc)(void *value); typedef void (*RedisModuleTypeFreeFunc)(void *value);
typedef void (*RedisModuleClusterMessageReceiver)(RedisModuleCtx *ctx, const char *sender_id, uint8_t type, const unsigned char *payload, uint32_t len); typedef void (*RedisModuleClusterMessageReceiver)(RedisModuleCtx *ctx, const char *sender_id, uint8_t type, const unsigned char *payload, uint32_t len);
typedef void (*RedisModuleTimerProc)(RedisModuleCtx *ctx, void *data); typedef void (*RedisModuleTimerProc)(RedisModuleCtx *ctx, void *data);
typedef void (*RedisModuleCommandFilterFunc) (RedisModuleCommandFilterCtx *filter);
#define REDISMODULE_TYPE_METHOD_VERSION 1 #define REDISMODULE_TYPE_METHOD_VERSION 1
typedef struct RedisModuleTypeMethods { typedef struct RedisModuleTypeMethods {
@ -278,6 +287,7 @@ int REDISMODULE_API_FUNC(RedisModule_StringAppendBuffer)(RedisModuleCtx *ctx, Re
void REDISMODULE_API_FUNC(RedisModule_RetainString)(RedisModuleCtx *ctx, RedisModuleString *str); void REDISMODULE_API_FUNC(RedisModule_RetainString)(RedisModuleCtx *ctx, RedisModuleString *str);
int REDISMODULE_API_FUNC(RedisModule_StringCompare)(RedisModuleString *a, RedisModuleString *b); int REDISMODULE_API_FUNC(RedisModule_StringCompare)(RedisModuleString *a, RedisModuleString *b);
RedisModuleCtx *REDISMODULE_API_FUNC(RedisModule_GetContextFromIO)(RedisModuleIO *io); RedisModuleCtx *REDISMODULE_API_FUNC(RedisModule_GetContextFromIO)(RedisModuleIO *io);
const RedisModuleString *REDISMODULE_API_FUNC(RedisModule_GetKeyNameFromIO)(RedisModuleIO *io);
long long REDISMODULE_API_FUNC(RedisModule_Milliseconds)(void); long long REDISMODULE_API_FUNC(RedisModule_Milliseconds)(void);
void REDISMODULE_API_FUNC(RedisModule_DigestAddStringBuffer)(RedisModuleDigest *md, unsigned char *ele, size_t len); void REDISMODULE_API_FUNC(RedisModule_DigestAddStringBuffer)(RedisModuleDigest *md, unsigned char *ele, size_t len);
void REDISMODULE_API_FUNC(RedisModule_DigestAddLongLong)(RedisModuleDigest *md, long long ele); void REDISMODULE_API_FUNC(RedisModule_DigestAddLongLong)(RedisModuleDigest *md, long long ele);
@ -335,6 +345,15 @@ void REDISMODULE_API_FUNC(RedisModule_GetRandomBytes)(unsigned char *dst, size_t
void REDISMODULE_API_FUNC(RedisModule_GetRandomHexChars)(char *dst, size_t len); void REDISMODULE_API_FUNC(RedisModule_GetRandomHexChars)(char *dst, size_t len);
void REDISMODULE_API_FUNC(RedisModule_SetDisconnectCallback)(RedisModuleBlockedClient *bc, RedisModuleDisconnectFunc callback); void REDISMODULE_API_FUNC(RedisModule_SetDisconnectCallback)(RedisModuleBlockedClient *bc, RedisModuleDisconnectFunc callback);
void REDISMODULE_API_FUNC(RedisModule_SetClusterFlags)(RedisModuleCtx *ctx, uint64_t flags); void REDISMODULE_API_FUNC(RedisModule_SetClusterFlags)(RedisModuleCtx *ctx, uint64_t flags);
int REDISMODULE_API_FUNC(RedisModule_ExportSharedAPI)(RedisModuleCtx *ctx, const char *apiname, void *func);
void *REDISMODULE_API_FUNC(RedisModule_GetSharedAPI)(RedisModuleCtx *ctx, const char *apiname);
RedisModuleCommandFilter *REDISMODULE_API_FUNC(RedisModule_RegisterCommandFilter)(RedisModuleCtx *ctx, RedisModuleCommandFilterFunc cb, int flags);
int REDISMODULE_API_FUNC(RedisModule_UnregisterCommandFilter)(RedisModuleCtx *ctx, RedisModuleCommandFilter *filter);
int REDISMODULE_API_FUNC(RedisModule_CommandFilterArgsCount)(RedisModuleCommandFilterCtx *fctx);
const RedisModuleString *REDISMODULE_API_FUNC(RedisModule_CommandFilterArgGet)(RedisModuleCommandFilterCtx *fctx, int pos);
int REDISMODULE_API_FUNC(RedisModule_CommandFilterArgInsert)(RedisModuleCommandFilterCtx *fctx, int pos, RedisModuleString *arg);
int REDISMODULE_API_FUNC(RedisModule_CommandFilterArgReplace)(RedisModuleCommandFilterCtx *fctx, int pos, RedisModuleString *arg);
int REDISMODULE_API_FUNC(RedisModule_CommandFilterArgDelete)(RedisModuleCommandFilterCtx *fctx, int pos);
#endif #endif
/* This is included inline inside each Redis module. */ /* This is included inline inside each Redis module. */
@ -440,6 +459,7 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int
REDISMODULE_GET_API(RetainString); REDISMODULE_GET_API(RetainString);
REDISMODULE_GET_API(StringCompare); REDISMODULE_GET_API(StringCompare);
REDISMODULE_GET_API(GetContextFromIO); REDISMODULE_GET_API(GetContextFromIO);
REDISMODULE_GET_API(GetKeyNameFromIO);
REDISMODULE_GET_API(Milliseconds); REDISMODULE_GET_API(Milliseconds);
REDISMODULE_GET_API(DigestAddStringBuffer); REDISMODULE_GET_API(DigestAddStringBuffer);
REDISMODULE_GET_API(DigestAddLongLong); REDISMODULE_GET_API(DigestAddLongLong);
@ -495,6 +515,15 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int
REDISMODULE_GET_API(GetRandomBytes); REDISMODULE_GET_API(GetRandomBytes);
REDISMODULE_GET_API(GetRandomHexChars); REDISMODULE_GET_API(GetRandomHexChars);
REDISMODULE_GET_API(SetClusterFlags); REDISMODULE_GET_API(SetClusterFlags);
REDISMODULE_GET_API(ExportSharedAPI);
REDISMODULE_GET_API(GetSharedAPI);
REDISMODULE_GET_API(RegisterCommandFilter);
REDISMODULE_GET_API(UnregisterCommandFilter);
REDISMODULE_GET_API(CommandFilterArgsCount);
REDISMODULE_GET_API(CommandFilterArgGet);
REDISMODULE_GET_API(CommandFilterArgInsert);
REDISMODULE_GET_API(CommandFilterArgReplace);
REDISMODULE_GET_API(CommandFilterArgDelete);
#endif #endif
if (RedisModule_IsModuleNameBusy && RedisModule_IsModuleNameBusy(name)) return REDISMODULE_ERR; if (RedisModule_IsModuleNameBusy && RedisModule_IsModuleNameBusy(name)) return REDISMODULE_ERR;

View File

@ -593,6 +593,7 @@ int startBgsaveForReplication(int mincapa) {
client *slave = ln->value; client *slave = ln->value;
if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) { if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) {
slave->replstate = REPL_STATE_NONE;
slave->flags &= ~CLIENT_SLAVE; slave->flags &= ~CLIENT_SLAVE;
listDelNode(server.slaves,ln); listDelNode(server.slaves,ln);
addReplyError(slave, addReplyError(slave,
@ -1090,14 +1091,23 @@ void replicationCreateMasterClient(int fd, int dbid) {
if (dbid != -1) selectDb(server.master,dbid); if (dbid != -1) selectDb(server.master,dbid);
} }
void restartAOF() { /* This function will try to re-enable the AOF file after the
int retry = 10; * master-replica synchronization: if it fails after multiple attempts
while (retry-- && startAppendOnly() == C_ERR) { * the replica cannot be considered reliable and exists with an
serverLog(LL_WARNING,"Failed enabling the AOF after successful master synchronization! Trying it again in one second."); * error. */
void restartAOFAfterSYNC() {
unsigned int tries, max_tries = 10;
for (tries = 0; tries < max_tries; ++tries) {
if (startAppendOnly() == C_OK) break;
serverLog(LL_WARNING,
"Failed enabling the AOF after successful master synchronization! "
"Trying it again in one second.");
sleep(1); sleep(1);
} }
if (!retry) { if (tries == max_tries) {
serverLog(LL_WARNING,"FATAL: this replica instance finished the synchronization with its master, but the AOF can't be turned on. Exiting now."); serverLog(LL_WARNING,
"FATAL: this replica instance finished the synchronization with "
"its master, but the AOF can't be turned on. Exiting now.");
exit(1); exit(1);
} }
} }
@ -1284,7 +1294,7 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
cancelReplicationHandshake(); cancelReplicationHandshake();
/* Re-enable the AOF if we disabled it earlier, in order to restore /* Re-enable the AOF if we disabled it earlier, in order to restore
* the original configuration. */ * the original configuration. */
if (aof_is_enabled) restartAOF(); if (aof_is_enabled) restartAOFAfterSYNC();
return; return;
} }
/* Final setup of the connected slave <- master link */ /* Final setup of the connected slave <- master link */
@ -1309,7 +1319,7 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
/* Restart the AOF subsystem now that we finished the sync. This /* Restart the AOF subsystem now that we finished the sync. This
* will trigger an AOF rewrite, and when done will start appending * will trigger an AOF rewrite, and when done will start appending
* to the new file. */ * to the new file. */
if (aof_is_enabled) restartAOF(); if (aof_is_enabled) restartAOFAfterSYNC();
} }
return; return;
@ -2053,8 +2063,11 @@ void replicaofCommand(client *c) {
/* Check if we are already attached to the specified slave */ /* Check if we are already attached to the specified slave */
if (server.masterhost && !strcasecmp(server.masterhost,c->argv[1]->ptr) if (server.masterhost && !strcasecmp(server.masterhost,c->argv[1]->ptr)
&& server.masterport == port) { && server.masterport == port) {
serverLog(LL_NOTICE,"REPLICAOF would result into synchronization with the master we are already connected with. No operation performed."); serverLog(LL_NOTICE,"REPLICAOF would result into synchronization "
addReplySds(c,sdsnew("+OK Already connected to specified master\r\n")); "with the master we are already connected "
"with. No operation performed.");
addReplySds(c,sdsnew("+OK Already connected to specified "
"master\r\n"));
return; return;
} }
/* There was no previous master or the user specified a different one, /* There was no previous master or the user specified a different one,

View File

@ -462,6 +462,11 @@ int luaRedisGenericCommand(lua_State *lua, int raise_error) {
c->argc = argc; c->argc = argc;
c->user = server.lua_caller->user; c->user = server.lua_caller->user;
/* Process module hooks */
moduleCallCommandFilters(c);
argv = c->argv;
argc = c->argc;
/* Log the command if debugging is active. */ /* Log the command if debugging is active. */
if (ldb.active && ldb.step) { if (ldb.active && ldb.step) {
sds cmdlog = sdsnew("<redis>"); sds cmdlog = sdsnew("<redis>");

View File

@ -257,8 +257,12 @@ sds sdsRemoveFreeSpace(sds s) {
char type, oldtype = s[-1] & SDS_TYPE_MASK; char type, oldtype = s[-1] & SDS_TYPE_MASK;
int hdrlen, oldhdrlen = sdsHdrSize(oldtype); int hdrlen, oldhdrlen = sdsHdrSize(oldtype);
size_t len = sdslen(s); size_t len = sdslen(s);
size_t avail = sdsavail(s);
sh = (char*)s-oldhdrlen; sh = (char*)s-oldhdrlen;
/* Return ASAP if there is no space left. */
if (avail == 0) return s;
/* Check what would be the minimum SDS header that is just good enough to /* Check what would be the minimum SDS header that is just good enough to
* fit this string. */ * fit this string. */
type = sdsReqType(len); type = sdsReqType(len);

View File

@ -715,7 +715,7 @@ struct redisCommand redisCommandTable[] = {
{"touch",touchCommand,-2, {"touch",touchCommand,-2,
"read-only fast @keyspace", "read-only fast @keyspace",
0,NULL,1,1,1,0,0,0}, 0,NULL,1,-1,1,0,0,0},
{"pttl",pttlCommand,2, {"pttl",pttlCommand,2,
"read-only fast random @keyspace", "read-only fast random @keyspace",
@ -863,7 +863,7 @@ struct redisCommand redisCommandTable[] = {
"no-script @keyspace", "no-script @keyspace",
0,NULL,0,0,0,0,0,0}, 0,NULL,0,0,0,0,0,0},
{"command",commandCommand,0, {"command",commandCommand,-1,
"ok-loading ok-stale random @connection", "ok-loading ok-stale random @connection",
0,NULL,0,0,0,0,0,0}, 0,NULL,0,0,0,0,0,0},
@ -3268,6 +3268,8 @@ void call(client *c, int flags) {
* other operations can be performed by the caller. Otherwise * other operations can be performed by the caller. Otherwise
* if C_ERR is returned the client was destroyed (i.e. after QUIT). */ * if C_ERR is returned the client was destroyed (i.e. after QUIT). */
int processCommand(client *c) { int processCommand(client *c) {
moduleCallCommandFilters(c);
/* The QUIT command is handled separately. Normal command procs will /* The QUIT command is handled separately. Normal command procs will
* go through checking for replication and QUIT will cause trouble * go through checking for replication and QUIT will cause trouble
* when FORCE_REPLICATION is enabled and would be implemented in * when FORCE_REPLICATION is enabled and would be implemented in
@ -4500,6 +4502,7 @@ static void sigShutdownHandler(int sig) {
rdbRemoveTempFile(getpid()); rdbRemoveTempFile(getpid());
exit(1); /* Exit with an error since this was not a clean shutdown. */ exit(1); /* Exit with an error since this was not a clean shutdown. */
} else if (server.loading) { } else if (server.loading) {
serverLogFromHandler(LL_WARNING, "Received shutdown signal during loading, exiting now.");
exit(0); exit(0);
} }

View File

@ -468,7 +468,8 @@ typedef long long mstime_t; /* millisecond time type. */
#define NOTIFY_EXPIRED (1<<8) /* x */ #define NOTIFY_EXPIRED (1<<8) /* x */
#define NOTIFY_EVICTED (1<<9) /* e */ #define NOTIFY_EVICTED (1<<9) /* e */
#define NOTIFY_STREAM (1<<10) /* t */ #define NOTIFY_STREAM (1<<10) /* t */
#define NOTIFY_ALL (NOTIFY_GENERIC | NOTIFY_STRING | NOTIFY_LIST | NOTIFY_SET | NOTIFY_HASH | NOTIFY_ZSET | NOTIFY_EXPIRED | NOTIFY_EVICTED | NOTIFY_STREAM) /* A flag */ #define NOTIFY_KEY_MISS (1<<11) /* m */
#define NOTIFY_ALL (NOTIFY_GENERIC | NOTIFY_STRING | NOTIFY_LIST | NOTIFY_SET | NOTIFY_HASH | NOTIFY_ZSET | NOTIFY_EXPIRED | NOTIFY_EVICTED | NOTIFY_STREAM | NOTIFY_KEY_MISS) /* A flag */
/* Get the first bind addr or NULL */ /* Get the first bind addr or NULL */
#define NET_FIRST_BIND_ADDR (server.bindaddr_count ? server.bindaddr[0] : NULL) #define NET_FIRST_BIND_ADDR (server.bindaddr_count ? server.bindaddr[0] : NULL)
@ -578,16 +579,18 @@ typedef struct RedisModuleIO {
int ver; /* Module serialization version: 1 (old), int ver; /* Module serialization version: 1 (old),
* 2 (current version with opcodes annotation). */ * 2 (current version with opcodes annotation). */
struct RedisModuleCtx *ctx; /* Optional context, see RM_GetContextFromIO()*/ struct RedisModuleCtx *ctx; /* Optional context, see RM_GetContextFromIO()*/
struct redisObject *key; /* Optional name of key processed */
} RedisModuleIO; } RedisModuleIO;
/* Macro to initialize an IO context. Note that the 'ver' field is populated /* Macro to initialize an IO context. Note that the 'ver' field is populated
* inside rdb.c according to the version of the value to load. */ * inside rdb.c according to the version of the value to load. */
#define moduleInitIOContext(iovar,mtype,rioptr) do { \ #define moduleInitIOContext(iovar,mtype,rioptr,keyptr) do { \
iovar.rio = rioptr; \ iovar.rio = rioptr; \
iovar.type = mtype; \ iovar.type = mtype; \
iovar.bytes = 0; \ iovar.bytes = 0; \
iovar.error = 0; \ iovar.error = 0; \
iovar.ver = 0; \ iovar.ver = 0; \
iovar.key = keyptr; \
iovar.ctx = NULL; \ iovar.ctx = NULL; \
} while(0); } while(0);
@ -1026,7 +1029,9 @@ struct redisServer {
size_t initial_memory_usage; /* Bytes used after initialization. */ size_t initial_memory_usage; /* Bytes used after initialization. */
int always_show_logo; /* Show logo even for non-stdout logging. */ int always_show_logo; /* Show logo even for non-stdout logging. */
/* Modules */ /* Modules */
dict *moduleapi; /* Exported APIs dictionary for modules. */ dict *moduleapi; /* Exported core APIs dictionary for modules. */
dict *sharedapi; /* Like moduleapi but containing the APIs that
modules share with each other. */
list *loadmodule_queue; /* List of modules to load at startup. */ list *loadmodule_queue; /* List of modules to load at startup. */
int module_blocked_pipe[2]; /* Pipe used to awake the event loop if a int module_blocked_pipe[2]; /* Pipe used to awake the event loop if a
client blocked on a module command needs client blocked on a module command needs
@ -1487,7 +1492,7 @@ size_t moduleCount(void);
void moduleAcquireGIL(void); void moduleAcquireGIL(void);
void moduleReleaseGIL(void); void moduleReleaseGIL(void);
void moduleNotifyKeyspaceEvent(int type, const char *event, robj *key, int dbid); void moduleNotifyKeyspaceEvent(int type, const char *event, robj *key, int dbid);
void moduleCallCommandFilters(client *c);
/* Utils */ /* Utils */
long long ustime(void); long long ustime(void);
@ -1524,6 +1529,7 @@ void addReplyNullArray(client *c);
void addReplyBool(client *c, int b); void addReplyBool(client *c, int b);
void addReplyVerbatim(client *c, const char *s, size_t len, const char *ext); void addReplyVerbatim(client *c, const char *s, size_t len, const char *ext);
void addReplyProto(client *c, const char *s, size_t len); void addReplyProto(client *c, const char *s, size_t len);
void AddReplyFromClient(client *c, client *src);
void addReplyBulk(client *c, robj *obj); void addReplyBulk(client *c, robj *obj);
void addReplyBulkCString(client *c, const char *s); void addReplyBulkCString(client *c, const char *s);
void addReplyBulkCBuffer(client *c, const void *p, size_t len); void addReplyBulkCBuffer(client *c, const void *p, size_t len);
@ -1660,6 +1666,7 @@ int compareStringObjects(robj *a, robj *b);
int collateStringObjects(robj *a, robj *b); int collateStringObjects(robj *a, robj *b);
int equalStringObjects(robj *a, robj *b); int equalStringObjects(robj *a, robj *b);
unsigned long long estimateObjectIdleTime(robj *o); unsigned long long estimateObjectIdleTime(robj *o);
void trimStringObjectIfNeeded(robj *o);
#define sdsEncodedObject(objptr) (objptr->encoding == OBJ_ENCODING_RAW || objptr->encoding == OBJ_ENCODING_EMBSTR) #define sdsEncodedObject(objptr) (objptr->encoding == OBJ_ENCODING_RAW || objptr->encoding == OBJ_ENCODING_EMBSTR)
/* Synchronous I/O with timeout */ /* Synchronous I/O with timeout */

View File

@ -58,7 +58,7 @@ redisSortOperation *createSortOperation(int type, robj *pattern) {
* *
* The returned object will always have its refcount increased by 1 * The returned object will always have its refcount increased by 1
* when it is non-NULL. */ * when it is non-NULL. */
robj *lookupKeyByPattern(redisDb *db, robj *pattern, robj *subst) { robj *lookupKeyByPattern(redisDb *db, robj *pattern, robj *subst, int writeflag) {
char *p, *f, *k; char *p, *f, *k;
sds spat, ssub; sds spat, ssub;
robj *keyobj, *fieldobj = NULL, *o; robj *keyobj, *fieldobj = NULL, *o;
@ -106,7 +106,10 @@ robj *lookupKeyByPattern(redisDb *db, robj *pattern, robj *subst) {
decrRefCount(subst); /* Incremented by decodeObject() */ decrRefCount(subst); /* Incremented by decodeObject() */
/* Lookup substituted key */ /* Lookup substituted key */
if (!writeflag)
o = lookupKeyRead(db,keyobj); o = lookupKeyRead(db,keyobj);
else
o = lookupKeyWrite(db,keyobj);
if (o == NULL) goto noobj; if (o == NULL) goto noobj;
if (fieldobj) { if (fieldobj) {
@ -198,30 +201,12 @@ void sortCommand(client *c) {
robj *sortval, *sortby = NULL, *storekey = NULL; robj *sortval, *sortby = NULL, *storekey = NULL;
redisSortObject *vector; /* Resulting vector to sort */ redisSortObject *vector; /* Resulting vector to sort */
/* Lookup the key to sort. It must be of the right types */
sortval = lookupKeyRead(c->db,c->argv[1]);
if (sortval && sortval->type != OBJ_SET &&
sortval->type != OBJ_LIST &&
sortval->type != OBJ_ZSET)
{
addReply(c,shared.wrongtypeerr);
return;
}
/* Create a list of operations to perform for every sorted element. /* Create a list of operations to perform for every sorted element.
* Operations can be GET */ * Operations can be GET */
operations = listCreate(); operations = listCreate();
listSetFreeMethod(operations,zfree); listSetFreeMethod(operations,zfree);
j = 2; /* options start at argv[2] */ j = 2; /* options start at argv[2] */
/* Now we need to protect sortval incrementing its count, in the future
* SORT may have options able to overwrite/delete keys during the sorting
* and the sorted key itself may get destroyed */
if (sortval)
incrRefCount(sortval);
else
sortval = createQuicklistObject();
/* The SORT command has an SQL-alike syntax, parse it */ /* The SORT command has an SQL-alike syntax, parse it */
while(j < c->argc) { while(j < c->argc) {
int leftargs = c->argc-j-1; int leftargs = c->argc-j-1;
@ -280,11 +265,33 @@ void sortCommand(client *c) {
/* Handle syntax errors set during options parsing. */ /* Handle syntax errors set during options parsing. */
if (syntax_error) { if (syntax_error) {
decrRefCount(sortval);
listRelease(operations); listRelease(operations);
return; return;
} }
/* Lookup the key to sort. It must be of the right types */
if (storekey)
sortval = lookupKeyRead(c->db,c->argv[1]);
else
sortval = lookupKeyWrite(c->db,c->argv[1]);
if (sortval && sortval->type != OBJ_SET &&
sortval->type != OBJ_LIST &&
sortval->type != OBJ_ZSET)
{
listRelease(operations);
addReply(c,shared.wrongtypeerr);
return;
}
/* Now we need to protect sortval incrementing its count, in the future
* SORT may have options able to overwrite/delete keys during the sorting
* and the sorted key itself may get destroyed */
if (sortval)
incrRefCount(sortval);
else
sortval = createQuicklistObject();
/* When sorting a set with no sort specified, we must sort the output /* When sorting a set with no sort specified, we must sort the output
* so the result is consistent across scripting and replication. * so the result is consistent across scripting and replication.
* *
@ -452,7 +459,7 @@ void sortCommand(client *c) {
robj *byval; robj *byval;
if (sortby) { if (sortby) {
/* lookup value to sort by */ /* lookup value to sort by */
byval = lookupKeyByPattern(c->db,sortby,vector[j].obj); byval = lookupKeyByPattern(c->db,sortby,vector[j].obj,storekey!=NULL);
if (!byval) continue; if (!byval) continue;
} else { } else {
/* use object itself to sort by */ /* use object itself to sort by */
@ -515,7 +522,7 @@ void sortCommand(client *c) {
while((ln = listNext(&li))) { while((ln = listNext(&li))) {
redisSortOperation *sop = ln->value; redisSortOperation *sop = ln->value;
robj *val = lookupKeyByPattern(c->db,sop->pattern, robj *val = lookupKeyByPattern(c->db,sop->pattern,
vector[j].obj); vector[j].obj,storekey!=NULL);
if (sop->type == SORT_OP_GET) { if (sop->type == SORT_OP_GET) {
if (!val) { if (!val) {
@ -545,7 +552,7 @@ void sortCommand(client *c) {
while((ln = listNext(&li))) { while((ln = listNext(&li))) {
redisSortOperation *sop = ln->value; redisSortOperation *sop = ln->value;
robj *val = lookupKeyByPattern(c->db,sop->pattern, robj *val = lookupKeyByPattern(c->db,sop->pattern,
vector[j].obj); vector[j].obj,storekey!=NULL);
if (sop->type == SORT_OP_GET) { if (sop->type == SORT_OP_GET) {
if (!val) val = createStringObject("",0); if (!val) val = createStringObject("",0);

View File

@ -615,6 +615,10 @@ void hincrbyfloatCommand(client *c) {
} }
value += incr; value += incr;
if (isnan(value) || isinf(value)) {
addReplyError(c,"increment would produce NaN or Infinity");
return;
}
char buf[MAX_LONG_DOUBLE_CHARS]; char buf[MAX_LONG_DOUBLE_CHARS];
int len = ld2string(buf,sizeof(buf),value,1); int len = ld2string(buf,sizeof(buf),value,1);

View File

@ -520,7 +520,7 @@ void lremCommand(client *c) {
if (removed) { if (removed) {
signalModifiedKey(c->db,c->argv[1]); signalModifiedKey(c->db,c->argv[1]);
notifyKeyspaceEvent(NOTIFY_GENERIC,"lrem",c->argv[1],c->db->id); notifyKeyspaceEvent(NOTIFY_LIST,"lrem",c->argv[1],c->db->id);
} }
if (listTypeLength(subject) == 0) { if (listTypeLength(subject) == 0) {

View File

@ -415,7 +415,7 @@ void spopWithCountCommand(client *c) {
/* Make sure a key with the name inputted exists, and that it's type is /* Make sure a key with the name inputted exists, and that it's type is
* indeed a set. Otherwise, return nil */ * indeed a set. Otherwise, return nil */
if ((set = lookupKeyReadOrReply(c,c->argv[1],shared.null[c->resp])) if ((set = lookupKeyWriteOrReply(c,c->argv[1],shared.null[c->resp]))
== NULL || checkType(c,set,OBJ_SET)) return; == NULL || checkType(c,set,OBJ_SET)) return;
/* If count is zero, serve an empty multibulk ASAP to avoid special /* If count is zero, serve an empty multibulk ASAP to avoid special

View File

@ -2906,7 +2906,10 @@ void genericZrangebylexCommand(client *c, int reverse) {
while (remaining) { while (remaining) {
if (remaining >= 3 && !strcasecmp(c->argv[pos]->ptr,"limit")) { if (remaining >= 3 && !strcasecmp(c->argv[pos]->ptr,"limit")) {
if ((getLongFromObjectOrReply(c, c->argv[pos+1], &offset, NULL) != C_OK) || if ((getLongFromObjectOrReply(c, c->argv[pos+1], &offset, NULL) != C_OK) ||
(getLongFromObjectOrReply(c, c->argv[pos+2], &limit, NULL) != C_OK)) return; (getLongFromObjectOrReply(c, c->argv[pos+2], &limit, NULL) != C_OK)) {
zslFreeLexRange(&range);
return;
}
pos += 3; remaining -= 3; pos += 3; remaining -= 3;
} else { } else {
zslFreeLexRange(&range); zslFreeLexRange(&range);
@ -3140,7 +3143,10 @@ void genericZpopCommand(client *c, robj **keyv, int keyc, int where, int emitkey
if (countarg) { if (countarg) {
if (getLongFromObjectOrReply(c,countarg,&count,NULL) != C_OK) if (getLongFromObjectOrReply(c,countarg,&count,NULL) != C_OK)
return; return;
if (count < 0) count = 1; if (count <= 0) {
addReply(c,shared.emptyarray);
return;
}
} }
/* Check type and break on the first error, otherwise identify candidate. */ /* Check type and break on the first error, otherwise identify candidate. */

View File

@ -447,7 +447,7 @@ int string2l(const char *s, size_t slen, long *lval) {
* a double: no spaces or other characters before or after the string * a double: no spaces or other characters before or after the string
* representing the number are accepted. */ * representing the number are accepted. */
int string2ld(const char *s, size_t slen, long double *dp) { int string2ld(const char *s, size_t slen, long double *dp) {
char buf[256]; char buf[MAX_LONG_DOUBLE_CHARS];
long double value; long double value;
char *eptr; char *eptr;

View File

@ -148,6 +148,10 @@ void *zrealloc(void *ptr, size_t size) {
size_t oldsize; size_t oldsize;
void *newptr; void *newptr;
if (size == 0 && ptr != NULL) {
zfree(ptr);
return NULL;
}
if (ptr == NULL) return zmalloc(size); if (ptr == NULL) return zmalloc(size);
#ifdef HAVE_MALLOC_SIZE #ifdef HAVE_MALLOC_SIZE
oldsize = zmalloc_size(ptr); oldsize = zmalloc_size(ptr);

24
tests/modules/Makefile Normal file
View File

@ -0,0 +1,24 @@
# find the OS
uname_S := $(shell sh -c 'uname -s 2>/dev/null || echo not')
# Compile flags for linux / osx
ifeq ($(uname_S),Linux)
SHOBJ_CFLAGS ?= -W -Wall -fno-common -g -ggdb -std=c99 -O2
SHOBJ_LDFLAGS ?= -shared
else
SHOBJ_CFLAGS ?= -W -Wall -dynamic -fno-common -g -ggdb -std=c99 -O2
SHOBJ_LDFLAGS ?= -bundle -undefined dynamic_lookup
endif
.SUFFIXES: .c .so .xo .o
all: commandfilter.so
.c.xo:
$(CC) -I../../src $(CFLAGS) $(SHOBJ_CFLAGS) -fPIC -c $< -o $@
commandfilter.xo: ../../src/redismodule.h
commandfilter.so: commandfilter.xo
$(LD) -o $@ $< $(SHOBJ_LDFLAGS) $(LIBS) -lc

View File

@ -0,0 +1,149 @@
#define REDISMODULE_EXPERIMENTAL_API
#include "redismodule.h"
#include <string.h>
static RedisModuleString *log_key_name;
static const char log_command_name[] = "commandfilter.log";
static const char ping_command_name[] = "commandfilter.ping";
static const char unregister_command_name[] = "commandfilter.unregister";
static int in_log_command = 0;
static RedisModuleCommandFilter *filter = NULL;
int CommandFilter_UnregisterCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
{
(void) argc;
(void) argv;
RedisModule_ReplyWithLongLong(ctx,
RedisModule_UnregisterCommandFilter(ctx, filter));
return REDISMODULE_OK;
}
int CommandFilter_PingCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
{
(void) argc;
(void) argv;
RedisModuleCallReply *reply = RedisModule_Call(ctx, "ping", "c", "@log");
if (reply) {
RedisModule_ReplyWithCallReply(ctx, reply);
RedisModule_FreeCallReply(reply);
} else {
RedisModule_ReplyWithSimpleString(ctx, "Unknown command or invalid arguments");
}
return REDISMODULE_OK;
}
int CommandFilter_LogCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
{
RedisModuleString *s = RedisModule_CreateString(ctx, "", 0);
int i;
for (i = 1; i < argc; i++) {
size_t arglen;
const char *arg = RedisModule_StringPtrLen(argv[i], &arglen);
if (i > 1) RedisModule_StringAppendBuffer(ctx, s, " ", 1);
RedisModule_StringAppendBuffer(ctx, s, arg, arglen);
}
RedisModuleKey *log = RedisModule_OpenKey(ctx, log_key_name, REDISMODULE_WRITE|REDISMODULE_READ);
RedisModule_ListPush(log, REDISMODULE_LIST_HEAD, s);
RedisModule_CloseKey(log);
RedisModule_FreeString(ctx, s);
in_log_command = 1;
size_t cmdlen;
const char *cmdname = RedisModule_StringPtrLen(argv[1], &cmdlen);
RedisModuleCallReply *reply = RedisModule_Call(ctx, cmdname, "v", &argv[2], argc - 2);
if (reply) {
RedisModule_ReplyWithCallReply(ctx, reply);
RedisModule_FreeCallReply(reply);
} else {
RedisModule_ReplyWithSimpleString(ctx, "Unknown command or invalid arguments");
}
in_log_command = 0;
return REDISMODULE_OK;
}
void CommandFilter_CommandFilter(RedisModuleCommandFilterCtx *filter)
{
if (in_log_command) return; /* don't process our own RM_Call() from CommandFilter_LogCommand() */
/* Fun manipulations:
* - Remove @delme
* - Replace @replaceme
* - Append @insertbefore or @insertafter
* - Prefix with Log command if @log encounterd
*/
int log = 0;
int pos = 0;
while (pos < RedisModule_CommandFilterArgsCount(filter)) {
const RedisModuleString *arg = RedisModule_CommandFilterArgGet(filter, pos);
size_t arg_len;
const char *arg_str = RedisModule_StringPtrLen(arg, &arg_len);
if (arg_len == 6 && !memcmp(arg_str, "@delme", 6)) {
RedisModule_CommandFilterArgDelete(filter, pos);
continue;
}
if (arg_len == 10 && !memcmp(arg_str, "@replaceme", 10)) {
RedisModule_CommandFilterArgReplace(filter, pos,
RedisModule_CreateString(NULL, "--replaced--", 12));
} else if (arg_len == 13 && !memcmp(arg_str, "@insertbefore", 13)) {
RedisModule_CommandFilterArgInsert(filter, pos,
RedisModule_CreateString(NULL, "--inserted-before--", 19));
pos++;
} else if (arg_len == 12 && !memcmp(arg_str, "@insertafter", 12)) {
RedisModule_CommandFilterArgInsert(filter, pos + 1,
RedisModule_CreateString(NULL, "--inserted-after--", 18));
pos++;
} else if (arg_len == 4 && !memcmp(arg_str, "@log", 4)) {
log = 1;
}
pos++;
}
if (log) RedisModule_CommandFilterArgInsert(filter, 0,
RedisModule_CreateString(NULL, log_command_name, sizeof(log_command_name)-1));
}
int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
if (RedisModule_Init(ctx,"commandfilter",1,REDISMODULE_APIVER_1)
== REDISMODULE_ERR) return REDISMODULE_ERR;
if (argc != 2) {
RedisModule_Log(ctx, "warning", "Log key name not specified");
return REDISMODULE_ERR;
}
long long noself = 0;
log_key_name = RedisModule_CreateStringFromString(ctx, argv[0]);
RedisModule_StringToLongLong(argv[1], &noself);
if (RedisModule_CreateCommand(ctx,log_command_name,
CommandFilter_LogCommand,"write deny-oom",1,1,1) == REDISMODULE_ERR)
return REDISMODULE_ERR;
if (RedisModule_CreateCommand(ctx,ping_command_name,
CommandFilter_PingCommand,"deny-oom",1,1,1) == REDISMODULE_ERR)
return REDISMODULE_ERR;
if (RedisModule_CreateCommand(ctx,unregister_command_name,
CommandFilter_UnregisterCommand,"write deny-oom",1,1,1) == REDISMODULE_ERR)
return REDISMODULE_ERR;
if ((filter = RedisModule_RegisterCommandFilter(ctx, CommandFilter_CommandFilter,
noself ? REDISMODULE_CMDFILTER_NOSELF : 0))
== NULL) return REDISMODULE_ERR;
return REDISMODULE_OK;
}

View File

@ -108,4 +108,11 @@ start_server {tags {"acl"}} {
assert_match {*+debug|segfault*} $cmdstr assert_match {*+debug|segfault*} $cmdstr
assert_match {*+acl*} $cmdstr assert_match {*+acl*} $cmdstr
} }
test {ACL #5998 regression: memory leaks adding / removing subcommands} {
r AUTH default ""
r ACL setuser newuser reset -debug +debug|a +debug|b +debug|c
r ACL setuser newuser -debug
# The test framework will detect a leak if any.
}
} }

View File

@ -115,6 +115,34 @@ start_server {tags {"hll"}} {
set e set e
} {*WRONGTYPE*} } {*WRONGTYPE*}
test {Fuzzing dense/sparse encoding: Redis should always detect errors} {
for {set j 0} {$j < 1000} {incr j} {
r del hll
set items {}
set numitems [randomInt 3000]
for {set i 0} {$i < $numitems} {incr i} {
lappend items [expr {rand()}]
}
r pfadd hll {*}$items
# Corrupt it in some random way.
for {set i 0} {$i < 5} {incr i} {
set len [r strlen hll]
set pos [randomInt $len]
set byte [randstring 1 1 binary]
r setrange hll $pos $byte
# Don't modify more bytes 50% of times
if {rand() < 0.5} break
}
# Use the hyperloglog to check if it crashes
# Redis in some way.
catch {
r pfcount hll
}
}
}
test {PFADD, PFCOUNT, PFMERGE type checking works} { test {PFADD, PFCOUNT, PFMERGE type checking works} {
r set foo bar r set foo bar
catch {r pfadd foo 1} e catch {r pfadd foo 1} e

View File

@ -0,0 +1,84 @@
set testmodule [file normalize tests/modules/commandfilter.so]
start_server {tags {"modules"}} {
r module load $testmodule log-key 0
test {Command Filter handles redirected commands} {
r set mykey @log
r lrange log-key 0 -1
} "{set mykey @log}"
test {Command Filter can call RedisModule_CommandFilterArgDelete} {
r rpush mylist elem1 @delme elem2
r lrange mylist 0 -1
} {elem1 elem2}
test {Command Filter can call RedisModule_CommandFilterArgInsert} {
r del mylist
r rpush mylist elem1 @insertbefore elem2 @insertafter elem3
r lrange mylist 0 -1
} {elem1 --inserted-before-- @insertbefore elem2 @insertafter --inserted-after-- elem3}
test {Command Filter can call RedisModule_CommandFilterArgReplace} {
r del mylist
r rpush mylist elem1 @replaceme elem2
r lrange mylist 0 -1
} {elem1 --replaced-- elem2}
test {Command Filter applies on RM_Call() commands} {
r del log-key
r commandfilter.ping
r lrange log-key 0 -1
} "{ping @log}"
test {Command Filter applies on Lua redis.call()} {
r del log-key
r eval "redis.call('ping', '@log')" 0
r lrange log-key 0 -1
} "{ping @log}"
test {Command Filter applies on Lua redis.call() that calls a module} {
r del log-key
r eval "redis.call('commandfilter.ping')" 0
r lrange log-key 0 -1
} "{ping @log}"
test {Command Filter is unregistered implicitly on module unload} {
r del log-key
r module unload commandfilter
r set mykey @log
r lrange log-key 0 -1
} {}
r module load $testmodule log-key 0
test {Command Filter unregister works as expected} {
# Validate reloading succeeded
r del log-key
r set mykey @log
assert_equal "{set mykey @log}" [r lrange log-key 0 -1]
# Unregister
r commandfilter.unregister
r del log-key
r set mykey @log
r lrange log-key 0 -1
} {}
r module unload commandfilter
r module load $testmodule log-key 1
test {Command Filter REDISMODULE_CMDFILTER_NOSELF works as expected} {
r set mykey @log
assert_equal "{set mykey @log}" [r lrange log-key 0 -1]
r del log-key
r commandfilter.ping
assert_equal {} [r lrange log-key 0 -1]
r eval "redis.call('commandfilter.ping')" 0
assert_equal {} [r lrange log-key 0 -1]
}
}