
In modules we fill a set of function pointers defined in redismodule.h, populating a set of APIs that are callable from the module. We use this manual process instead of resorting to dynamic linking so that we have exact control on how we pass the API to the module, and we can even pass different functions for the same name, depending on the API version declared by the module. However if the function pointers in redismodule.h and the functions defined in module.c have the same name, they conflict since the core exports the symbols to the module. There is probably some compiler flags trick to avoid this, but in order to be safer in the future and be more easily compatible with different builidng systems, this commit changes the internal function prefix from RedisModule_ to RM_, so for example: RM_StringSet() will be exported as RedisModule_StringSet()
1392 lines
51 KiB
C
1392 lines
51 KiB
C
#include "server.h"
|
|
#include "cluster.h"
|
|
#include <dlfcn.h>
|
|
|
|
#define REDISMODULE_CORE 1
|
|
#include "redismodule.h"
|
|
|
|
/* --------------------------------------------------------------------------
|
|
* Private data structures used by the modules system. Those are data
|
|
* structures that are never exposed to Redis Modules, if not as void
|
|
* pointers that have an API the module can call with them)
|
|
* -------------------------------------------------------------------------- */
|
|
|
|
/* This structure represents a module inside the system. */
|
|
struct RedisModule {
|
|
void *handle; /* Module dlopen() handle. */
|
|
char *name; /* Module name. */
|
|
int ver; /* Module version. We use just progressive integers. */
|
|
int apiver; /* Module API version as requested during initialization.*/
|
|
};
|
|
typedef struct RedisModule RedisModule;
|
|
|
|
static dict *modules; /* Hash table of modules. SDS -> RedisModule ptr.*/
|
|
|
|
/* Entries in the context->amqueue array, representing objects to free
|
|
* when the callback returns. */
|
|
struct AutoMemEntry {
|
|
void *ptr;
|
|
int type;
|
|
};
|
|
|
|
/* AutMemEntry type field values. */
|
|
#define REDISMODULE_AM_KEY 0
|
|
#define REDISMODULE_AM_STRING 1
|
|
#define REDISMODULE_AM_REPLY 2
|
|
#define REDISMODULE_AM_FREED 3 /* Explicitly freed by user already. */
|
|
|
|
/* This structure represents the context in which Redis modules operate.
|
|
* Most APIs module can access, get a pointer to the context, so that the API
|
|
* implementation can hold state across calls, or remember what to free after
|
|
* the call and so forth.
|
|
*
|
|
* Note that not all the context structure is always filled with actual values
|
|
* but only the fields needed in a given context. */
|
|
struct RedisModuleCtx {
|
|
void *getapifuncptr; /* NOTE: Must be the first field. */
|
|
struct RedisModule *module; /* Module reference. */
|
|
client *client; /* Client calling a command. */
|
|
struct AutoMemEntry *amqueue; /* Auto memory queue of objects to free. */
|
|
int amqueue_len; /* Number of slots in amqueue. */
|
|
int amqueue_used; /* Number of used slots in amqueue. */
|
|
int flags; /* REDISMODULE_CTX_... flags. */
|
|
};
|
|
typedef struct RedisModuleCtx RedisModuleCtx;
|
|
|
|
#define REDISMODULE_CTX_INIT {(void*)(unsigned long)&RM_GetApi, NULL, NULL, NULL, 0, 0, 0}
|
|
#define REDISMODULE_CTX_MULTI_EMITTED (1<<0)
|
|
#define REDISMODULE_CTX_AUTO_MEMORY (1<<1)
|
|
|
|
/* This represents a Redis key opened with RM_OpenKey(). */
|
|
struct RedisModuleKey {
|
|
RedisModuleCtx *ctx;
|
|
redisDb *db;
|
|
robj *key; /* Key name object. */
|
|
robj *value; /* Value object, or NULL if the key was not found. */
|
|
void *iter; /* Iterator. */
|
|
int mode; /* Opening mode. */
|
|
};
|
|
typedef struct RedisModuleKey RedisModuleKey;
|
|
|
|
/* Function pointer type of a function representing a command inside
|
|
* a Redis module. */
|
|
typedef int (*RedisModuleCmdFunc) (RedisModuleCtx *ctx, void **argv, int argc);
|
|
|
|
/* This struct holds the information about a command registered by a module.*/
|
|
struct RedisModuleCommandProxy {
|
|
struct RedisModule *module;
|
|
RedisModuleCmdFunc func;
|
|
struct redisCommand *rediscmd;
|
|
};
|
|
typedef struct RedisModuleCommandProxy RedisModuleCommandProxy;
|
|
|
|
#define REDISMODULE_REPLYFLAG_NONE 0
|
|
#define REDISMODULE_REPLYFLAG_TOPARSE (1<<0) /* Protocol must be parsed. */
|
|
#define REDISMODULE_REPLYFLAG_NESTED (1<<1) /* Nested reply object. No proto
|
|
or struct free. */
|
|
|
|
/* Reply of RM_Call() function. The function is filled in a lazy
|
|
* way depending on the function called on the reply structure. By default
|
|
* only the type and proto are filled. */
|
|
struct RedisModuleCallReply {
|
|
RedisModuleCtx *ctx;
|
|
int type; /* REDISMODULE_REPLY_... */
|
|
int flags; /* REDISMODULE_REPLYFLAG_... */
|
|
size_t len; /* Len of strings or num of elements of arrays. */
|
|
char *proto; /* Raw reply protocol. An SDS string at top-level object. */
|
|
size_t protolen;/* Length of protocol. */
|
|
union {
|
|
const char *str; /* String pointer for string and error replies. This
|
|
does not need to be freed, always points inside
|
|
a reply->proto buffer of the reply object or, in
|
|
case of array elements, of parent reply objects. */
|
|
long long ll; /* Reply value for integer reply. */
|
|
struct RedisModuleCallReply *array; /* Array of sub-reply elements. */
|
|
} val;
|
|
};
|
|
typedef struct RedisModuleCallReply RedisModuleCallReply;
|
|
|
|
/* --------------------------------------------------------------------------
|
|
* Prototypes
|
|
* -------------------------------------------------------------------------- */
|
|
|
|
void RM_FreeCallReply(RedisModuleCallReply *reply);
|
|
void RM_CloseKey(RedisModuleKey *key);
|
|
void RM_AutoMemoryCollect(RedisModuleCtx *ctx);
|
|
robj **moduleCreateArgvFromUserFormat(const char *cmdname, const char *fmt, int *argcp, int *flags, va_list ap);
|
|
void moduleReplicateMultiIfNeeded(RedisModuleCtx *ctx);
|
|
|
|
/* --------------------------------------------------------------------------
|
|
* Helpers for modules API implementation
|
|
* -------------------------------------------------------------------------- */
|
|
|
|
/* Create an empty key of the specified type. 'kp' must point to a key object
|
|
* opened for writing where the .value member is set to NULL because the
|
|
* key was found to be non existing.
|
|
*
|
|
* On success REDISMODULE_OK is returned and the key is populated with
|
|
* the value of the specified type. The function fails and returns
|
|
* REDISMODULE_ERR if:
|
|
*
|
|
* 1) The key is not open for writing.
|
|
* 2) The key is not empty.
|
|
* 3) The specified type is unknown.
|
|
*/
|
|
int moduleCreateEmtpyKey(RedisModuleKey *key, int type) {
|
|
robj *obj;
|
|
|
|
/* The key must be open for writing and non existing to proceed. */
|
|
if (!(key->mode & REDISMODULE_WRITE) || key->value)
|
|
return REDISMODULE_ERR;
|
|
|
|
switch(type) {
|
|
case REDISMODULE_KEYTYPE_LIST:
|
|
obj = createQuicklistObject();
|
|
quicklistSetOptions(obj->ptr, server.list_max_ziplist_size,
|
|
server.list_compress_depth);
|
|
break;
|
|
default: return REDISMODULE_ERR;
|
|
}
|
|
dbAdd(key->db,key->key,obj);
|
|
key->value = obj;
|
|
return REDISMODULE_OK;
|
|
}
|
|
|
|
/* This function is called in low-level API implementation functions in order
|
|
* to check if the value associated with the key remained empty after an
|
|
* operation that removed elements from an aggregate data type.
|
|
*
|
|
* If this happens, the key is deleted from the DB and the key object state
|
|
* is set to the right one in order to be targeted again by write operations
|
|
* possibly recreating the key if needed.
|
|
*
|
|
* The function returns 1 if the key value object is found empty and is
|
|
* deleted, otherwise 0 is returned. */
|
|
int moduleDelKeyIfEmpty(RedisModuleKey *key) {
|
|
if (!(key->mode & REDISMODULE_WRITE) || key->value == NULL) return 0;
|
|
int isempty;
|
|
robj *o = key->value;
|
|
|
|
switch(o->type) {
|
|
case OBJ_LIST: isempty = listTypeLength(o) == 0; break;
|
|
case OBJ_SET: isempty = setTypeSize(o) == 0; break;
|
|
case OBJ_ZSET: isempty = zsetLength(o) == 0; break;
|
|
case OBJ_HASH : isempty = hashTypeLength(o) == 0; break;
|
|
default: isempty = 0;
|
|
}
|
|
|
|
if (isempty) {
|
|
dbDelete(key->db,key->key);
|
|
key->value = NULL;
|
|
return 1;
|
|
} else {
|
|
return 0;
|
|
}
|
|
}
|
|
|
|
/* --------------------------------------------------------------------------
|
|
* Service API exported to modules
|
|
*
|
|
* Note that all the exported APIs are called RM_<funcname> in the core
|
|
* and RedisModule_<funcname> in the module side (defined as function
|
|
* pointers in redismodule.h). In this way the dynamic linker does not
|
|
* mess with our global function pointers, overriding it with the symbols
|
|
* defined in the main executable having the same names.
|
|
* -------------------------------------------------------------------------- */
|
|
|
|
/* Lookup the requested module API and store the function pointer into the
|
|
* target pointer. The function returns REDISMODULE_ERR if there is no such
|
|
* named API, otherwise REDISMODULE_OK. */
|
|
int RM_GetApi(const char *funcname, void **targetPtrPtr) {
|
|
dictEntry *he = dictFind(server.moduleapi, funcname);
|
|
if (!he) return REDISMODULE_ERR;
|
|
*targetPtrPtr = dictGetVal(he);
|
|
return REDISMODULE_OK;
|
|
}
|
|
|
|
/* This Redis command binds the normal Redis command invocation with commands
|
|
* exported by modules. */
|
|
void RedisModuleCommandDispatcher(client *c) {
|
|
RedisModuleCommandProxy *cp = (void*)(unsigned long)c->cmd->getkeys_proc;
|
|
RedisModuleCtx ctx = REDISMODULE_CTX_INIT;
|
|
|
|
ctx.module = cp->module;
|
|
ctx.client = c;
|
|
cp->func(&ctx,(void**)c->argv,c->argc);
|
|
RM_AutoMemoryCollect(&ctx);
|
|
preventCommandPropagation(c);
|
|
|
|
/* Handle the replication of the final EXEC, since whatever a command
|
|
* emits is always wrappered around MULTI/EXEC. */
|
|
if (ctx.flags & REDISMODULE_CTX_MULTI_EMITTED) {
|
|
robj *propargv[1];
|
|
propargv[0] = createStringObject("EXEC",4);
|
|
alsoPropagate(server.execCommand,c->db->id,propargv,1,
|
|
PROPAGATE_AOF|PROPAGATE_REPL);
|
|
decrRefCount(propargv[0]);
|
|
}
|
|
}
|
|
|
|
/* Register a new command in the Redis server, that will be handled by
|
|
* calling the function pointer 'func' using the RedisModule calling
|
|
* convention. The function returns REDISMODULE_ERR if the specified command
|
|
* name is already busy, otherwise REDISMODULE_OK. */
|
|
int RM_CreateCommand(RedisModuleCtx *ctx, const char *name, RedisModuleCmdFunc cmdfunc) {
|
|
struct redisCommand *rediscmd;
|
|
RedisModuleCommandProxy *cp;
|
|
sds cmdname = sdsnew(name);
|
|
|
|
/* Check if the command name is busy. */
|
|
if (lookupCommand((char*)name) != NULL) {
|
|
sdsfree(cmdname);
|
|
return REDISMODULE_ERR;
|
|
}
|
|
|
|
/* Create a command "proxy", which is a structure that is referenced
|
|
* in the command table, so that the generic command that works as
|
|
* binidng between modules and Redis, can know what function to call
|
|
* and what the module is.
|
|
*
|
|
* Note that we use the Redis command table 'getkeys_proc' in order to
|
|
* pass a reference to the command proxy structure. */
|
|
cp = zmalloc(sizeof(*cp));
|
|
cp->module = ctx->module;
|
|
cp->func = cmdfunc;
|
|
cp->rediscmd = zmalloc(sizeof(*rediscmd));
|
|
cp->rediscmd->name = cmdname;
|
|
cp->rediscmd->proc = RedisModuleCommandDispatcher;
|
|
cp->rediscmd->arity = -1;
|
|
cp->rediscmd->flags = 0;
|
|
cp->rediscmd->getkeys_proc = (redisGetKeysProc*)(unsigned long)cp;
|
|
cp->rediscmd->firstkey = 1;
|
|
cp->rediscmd->lastkey = 1;
|
|
cp->rediscmd->keystep = 1;
|
|
cp->rediscmd->microseconds = 0;
|
|
cp->rediscmd->calls = 0;
|
|
dictAdd(server.commands,sdsdup(cmdname),cp->rediscmd);
|
|
dictAdd(server.orig_commands,sdsdup(cmdname),cp->rediscmd);
|
|
return REDISMODULE_OK;
|
|
}
|
|
|
|
/* Called by RM_Init() to setup the ctx->module structure. */
|
|
void RM_SetModuleAttribs(RedisModuleCtx *ctx, const char *name, int ver, int apiver){
|
|
RedisModule *module;
|
|
|
|
if (ctx->module != NULL) return;
|
|
module = zmalloc(sizeof(*module));
|
|
module->name = sdsnew((char*)name);
|
|
module->ver = ver;
|
|
module->apiver = apiver;
|
|
ctx->module = module;
|
|
}
|
|
|
|
/* --------------------------------------------------------------------------
|
|
* Automatic memory management for modules
|
|
* -------------------------------------------------------------------------- */
|
|
|
|
/* Enable auto memory. */
|
|
void RM_AutoMemory(RedisModuleCtx *ctx) {
|
|
ctx->flags |= REDISMODULE_CTX_AUTO_MEMORY;
|
|
}
|
|
|
|
/* Add a new object to release automatically when the callback returns. */
|
|
void RM_AutoMemoryAdd(RedisModuleCtx *ctx, int type, void *ptr) {
|
|
if (!(ctx->flags & REDISMODULE_CTX_AUTO_MEMORY)) return;
|
|
if (ctx->amqueue_used == ctx->amqueue_len) {
|
|
ctx->amqueue_len *= 2;
|
|
if (ctx->amqueue_len < 16) ctx->amqueue_len = 16;
|
|
ctx->amqueue = zrealloc(ctx->amqueue,sizeof(struct AutoMemEntry)*ctx->amqueue_len);
|
|
}
|
|
ctx->amqueue[ctx->amqueue_used].type = type;
|
|
ctx->amqueue[ctx->amqueue_used].ptr = ptr;
|
|
ctx->amqueue_used++;
|
|
}
|
|
|
|
/* Mark an object as freed in the auto release queue, so that users can still
|
|
* free things manually if they want. */
|
|
void RM_AutoMemoryFreed(RedisModuleCtx *ctx, int type, void *ptr) {
|
|
if (!(ctx->flags & REDISMODULE_CTX_AUTO_MEMORY)) return;
|
|
|
|
int j;
|
|
for (j = 0; j < ctx->amqueue_used; j++) {
|
|
if (ctx->amqueue[j].type == type &&
|
|
ctx->amqueue[j].ptr == ptr)
|
|
{
|
|
ctx->amqueue[j].type = REDISMODULE_AM_FREED;
|
|
/* Optimization: if this is the last element, we can
|
|
* reuse it. */
|
|
if (j == ctx->amqueue_used-1) ctx->amqueue_used--;
|
|
}
|
|
}
|
|
}
|
|
|
|
/* Release all the objects in queue. */
|
|
void RM_AutoMemoryCollect(RedisModuleCtx *ctx) {
|
|
if (!(ctx->flags & REDISMODULE_CTX_AUTO_MEMORY)) return;
|
|
/* Clear the AUTO_MEMORY flag from the context, otherwise the functions
|
|
* we call to free the resources, will try to scan the auto release
|
|
* queue to mark the entries as freed. */
|
|
ctx->flags &= ~REDISMODULE_CTX_AUTO_MEMORY;
|
|
int j;
|
|
for (j = 0; j < ctx->amqueue_used; j++) {
|
|
void *ptr = ctx->amqueue[j].ptr;
|
|
switch(ctx->amqueue[j].type) {
|
|
case REDISMODULE_AM_STRING: decrRefCount(ptr); break;
|
|
case REDISMODULE_AM_REPLY: RM_FreeCallReply(ptr); break;
|
|
case REDISMODULE_AM_KEY: RM_CloseKey(ptr); break;
|
|
}
|
|
}
|
|
ctx->flags |= REDISMODULE_CTX_AUTO_MEMORY;
|
|
zfree(ctx->amqueue);
|
|
ctx->amqueue = NULL;
|
|
ctx->amqueue_len = 0;
|
|
ctx->amqueue_used = 0;
|
|
}
|
|
|
|
/* --------------------------------------------------------------------------
|
|
* String objects APIs
|
|
* -------------------------------------------------------------------------- */
|
|
|
|
/* Create a new module string object. Must be freed with
|
|
* RM_FreeString(), unless automatic memory is enabled. */
|
|
RedisModuleString *RM_CreateString(RedisModuleCtx *ctx, const char *ptr, size_t len)
|
|
{
|
|
RedisModuleString *o = createStringObject(ptr,len);
|
|
RM_AutoMemoryAdd(ctx,REDISMODULE_AM_STRING,o);
|
|
return o;
|
|
}
|
|
|
|
/* Like RM_CreatString, but creates a string starting from a long long
|
|
* integer instea of taking a buffer and length. */
|
|
RedisModuleString *RM_CreateStringFromLongLong(RedisModuleCtx *ctx, long long ll) {
|
|
char buf[LONG_STR_SIZE];
|
|
size_t len = ll2string(buf,sizeof(buf),ll);
|
|
return RM_CreateString(ctx,buf,len);
|
|
}
|
|
|
|
/* Free a module string object obtained with one of the Redis API calls
|
|
* that return new string objects. */
|
|
void RM_FreeString(RedisModuleCtx *ctx, RedisModuleString *str) {
|
|
decrRefCount(str);
|
|
RM_AutoMemoryFreed(ctx,REDISMODULE_AM_STRING,str);
|
|
}
|
|
|
|
/* Return the string pointer and length. */
|
|
const char *RM_StringPtrLen(RedisModuleString *str, size_t *len) {
|
|
if (len) *len = sdslen(str->ptr);
|
|
return str->ptr;
|
|
}
|
|
|
|
/* Turn the string into a long long, storing it at *ll if not NULL.
|
|
* Returns REDISMODULE_OK on success. If the string can't be parsed
|
|
* as a valid, strict long long (no spaces before/after), REDISMODULE_ERR
|
|
* is returned. */
|
|
int RM_StringToLongLong(RedisModuleString *str, long long *ll) {
|
|
return string2ll(str->ptr,sdslen(str->ptr),ll) ? REDISMODULE_OK :
|
|
REDISMODULE_ERR;
|
|
}
|
|
|
|
/* --------------------------------------------------------------------------
|
|
* Reply APIs
|
|
*
|
|
* Most functions always return REDISMODULE_OK so you can use it with
|
|
* 'return' in order to return from the command implementation with:
|
|
*
|
|
* if (... some condition ...)
|
|
* return RM_ReplyWithLongLong(ctx,mycount);
|
|
* -------------------------------------------------------------------------- */
|
|
|
|
/* Send an error about the number of arguments given to the command. */
|
|
int RM_WrongArity(RedisModuleCtx *ctx) {
|
|
addReplyErrorFormat(ctx->client,
|
|
"wrong number of arguments for '%s' command",
|
|
(char*)ctx->client->argv[0]->ptr);
|
|
return REDISMODULE_OK;
|
|
}
|
|
|
|
/* Send an integer reply with the specified long long value.
|
|
* The function always returns REDISMODULE_OK. */
|
|
int RM_ReplyWithLongLong(RedisModuleCtx *ctx, long long ll) {
|
|
addReplyLongLong(ctx->client,ll);
|
|
return REDISMODULE_OK;
|
|
}
|
|
|
|
/* Reply with an error or simple string (status message). Used to implement
|
|
* ReplyWithSimpleString() and ReplyWithError(). */
|
|
int RM_ReplyWithStatus(RedisModuleCtx *ctx, const char *msg, char *prefix) {
|
|
sds strmsg = sdsnewlen(prefix,1);
|
|
strmsg = sdscat(strmsg,msg);
|
|
strmsg = sdscatlen(strmsg,"\r\n",2);
|
|
addReplySds(ctx->client,strmsg);
|
|
return REDISMODULE_OK;
|
|
}
|
|
|
|
/* Reply with the error 'err'.
|
|
*
|
|
* Note that 'err' must contain all the error, including
|
|
* the initial error code. The function only provides the initial "-", so
|
|
* the usage is, for example:
|
|
*
|
|
* RM_ReplyWithError(ctx,"ERR Wrong Type");
|
|
*
|
|
* and not just:
|
|
*
|
|
* RM_ReplyWithError(ctx,"Wrong Type");
|
|
*/
|
|
int RM_ReplyWithError(RedisModuleCtx *ctx, const char *err) {
|
|
return RM_ReplyWithStatus(ctx,err,"-");
|
|
}
|
|
|
|
/* Reply with a simple string (+... \r\n in RESP protocol). This replies
|
|
* are suitalbe only when sending a small non-binary string wiht small
|
|
* overhead, like "OK" or similar replies. */
|
|
int RM_ReplyWithSimpleString(RedisModuleCtx *ctx, const char *msg) {
|
|
return RM_ReplyWithStatus(ctx,msg,"+");
|
|
}
|
|
|
|
/* Reply with an array type of 'len' elements. However 'len' other calls
|
|
* to ReplyWith* style functions must follow in order to emit the elements
|
|
* of the array. */
|
|
int RM_ReplyWithArray(RedisModuleCtx *ctx, int len) {
|
|
addReplyMultiBulkLen(ctx->client,len);
|
|
return REDISMODULE_OK;
|
|
}
|
|
|
|
/* Reply with a bulk string, taking in input a C buffer pointer and length. */
|
|
int RM_ReplyWithStringBuffer(RedisModuleCtx *ctx, const char *buf, size_t len) {
|
|
addReplyBulkCBuffer(ctx->client,(char*)buf,len);
|
|
return REDISMODULE_OK;
|
|
}
|
|
|
|
/* Reply with a bulk string, taking in input a RedisModuleString object. */
|
|
int RM_ReplyWithString(RedisModuleCtx *ctx, RedisModuleString *str) {
|
|
addReplyBulk(ctx->client,str);
|
|
return REDISMODULE_OK;
|
|
}
|
|
|
|
/* --------------------------------------------------------------------------
|
|
* Commands replication API
|
|
* -------------------------------------------------------------------------- */
|
|
|
|
/* Helper function to replicate MULTI the first time we replicate something
|
|
* in the context of a command execution. EXEC will be handled by the
|
|
* RedisModuleCommandDispatcher() function. */
|
|
void moduleReplicateMultiIfNeeded(RedisModuleCtx *ctx) {
|
|
if (ctx->flags & REDISMODULE_CTX_MULTI_EMITTED) return;
|
|
execCommandPropagateMulti(ctx->client);
|
|
ctx->flags |= REDISMODULE_CTX_MULTI_EMITTED;
|
|
}
|
|
|
|
/* Replicate the specified command and arguments to slaves and AOF, as effect
|
|
* of execution of the calling command implementation.
|
|
*
|
|
* The replicated commands are always wrapepd into the MULTI/EXEC that
|
|
* contains all the commands replicated in a given module command
|
|
* execution. However the commands replicated with RM_Call()
|
|
* are the first items, the ones replicated with RM_Replicate()
|
|
* will all follow before the EXEC.
|
|
*
|
|
* Modules should try to use one interface or the other. */
|
|
int RM_Replicate(RedisModuleCtx *ctx, const char *cmdname, const char *fmt, ...) {
|
|
struct redisCommand *cmd;
|
|
robj **argv = NULL;
|
|
int argc = 0, flags = 0, j;
|
|
va_list ap;
|
|
|
|
cmd = lookupCommandByCString((char*)cmdname);
|
|
if (!cmd) return REDISMODULE_ERR;
|
|
|
|
/* Create the client and dispatch the command. */
|
|
va_start(ap, fmt);
|
|
argv = moduleCreateArgvFromUserFormat(cmdname,fmt,&argc,&flags,ap);
|
|
va_end(ap);
|
|
if (argv == NULL) return REDISMODULE_ERR;
|
|
|
|
/* Replicate! */
|
|
moduleReplicateMultiIfNeeded(ctx);
|
|
alsoPropagate(cmd,ctx->client->db->id,argv,argc,
|
|
PROPAGATE_AOF|PROPAGATE_REPL);
|
|
|
|
/* Release the argv. */
|
|
for (j = 0; j < argc; j++) decrRefCount(argv[j]);
|
|
zfree(argv);
|
|
return REDISMODULE_OK;
|
|
}
|
|
|
|
/* This function will replicate the command exactly as it was invoked
|
|
* by the client. This function will not wrap the command into
|
|
* a MULTI/EXEC stanza, so it should not be mixed with other replication
|
|
* commands. */
|
|
int RM_ReplicateVerbatim(RedisModuleCtx *ctx) {
|
|
alsoPropagate(ctx->client->cmd,ctx->client->db->id,
|
|
ctx->client->argv,ctx->client->argc,
|
|
PROPAGATE_AOF|PROPAGATE_REPL);
|
|
return REDISMODULE_OK;
|
|
}
|
|
|
|
/* --------------------------------------------------------------------------
|
|
* DB and Key APIs -- Generic API
|
|
* -------------------------------------------------------------------------- */
|
|
|
|
/* Return the currently selected DB. */
|
|
int RM_GetSelectedDb(RedisModuleCtx *ctx) {
|
|
return ctx->client->db->id;
|
|
}
|
|
|
|
/* Change the currently selected DB. Returns an error if the id
|
|
* is out of range. */
|
|
int RM_SelectDb(RedisModuleCtx *ctx, int newid) {
|
|
int retval = selectDb(ctx->client,newid);
|
|
return (retval == C_OK) ? REDISMODULE_OK : REDISMODULE_ERR;
|
|
}
|
|
|
|
/* Return an handle representing a Redis key, so that it is possible
|
|
* to call other APIs with the key handle as argument to perform
|
|
* operations on the key.
|
|
*
|
|
* The return value is the handle repesenting the key, that must be
|
|
* closed with RM_CloseKey().
|
|
*
|
|
* If the key does not exist and WRITE mode is requested, the handle
|
|
* is still returned, since it is possible to perform operations on
|
|
* a yet not existing key (that will be created, for example, after
|
|
* a list push operation). If the mode is just READ instead, and the
|
|
* key does not exist, NULL is returned. However it is still safe to
|
|
* call RM_CloseKey() and RM_KeyType() on a NULL
|
|
* value. */
|
|
void *RM_OpenKey(RedisModuleCtx *ctx, robj *keyname, int mode) {
|
|
RedisModuleKey *kp;
|
|
robj *value;
|
|
|
|
if (mode & REDISMODULE_WRITE) {
|
|
value = lookupKeyWrite(ctx->client->db,keyname);
|
|
} else {
|
|
value = lookupKeyRead(ctx->client->db,keyname);
|
|
if (value == NULL) {
|
|
decrRefCount(keyname);
|
|
return NULL;
|
|
}
|
|
}
|
|
|
|
/* Setup the key handle. */
|
|
kp = zmalloc(sizeof(*kp));
|
|
kp->ctx = ctx;
|
|
kp->db = ctx->client->db;
|
|
kp->key = keyname;
|
|
incrRefCount(keyname);
|
|
kp->value = value;
|
|
kp->iter = NULL;
|
|
kp->mode = mode;
|
|
RM_AutoMemoryAdd(ctx,REDISMODULE_AM_KEY,kp);
|
|
return (void*)kp;
|
|
}
|
|
|
|
/* Close a key handle. */
|
|
void RM_CloseKey(RedisModuleKey *key) {
|
|
if (key == NULL) return;
|
|
if (key->mode & REDISMODULE_WRITE) signalModifiedKey(key->db,key->key);
|
|
/* TODO: if (key->iter) RM_KeyIteratorStop(kp); */
|
|
decrRefCount(key->key);
|
|
RM_AutoMemoryFreed(key->ctx,REDISMODULE_AM_KEY,key);
|
|
zfree(key);
|
|
}
|
|
|
|
/* Return the type of the key. If the key pointer is NULL then
|
|
* REDISMODULE_KEYTYPE_EMPTY is returned. */
|
|
int RM_KeyType(RedisModuleKey *key) {
|
|
if (key == NULL || key->value == NULL) return REDISMODULE_KEYTYPE_EMPTY;
|
|
/* We map between defines so that we are free to change the internal
|
|
* defines as desired. */
|
|
switch(key->value->type) {
|
|
case OBJ_STRING: return REDISMODULE_KEYTYPE_STRING;
|
|
case OBJ_LIST: return REDISMODULE_KEYTYPE_LIST;
|
|
case OBJ_SET: return REDISMODULE_KEYTYPE_SET;
|
|
case OBJ_ZSET: return REDISMODULE_KEYTYPE_ZSET;
|
|
case OBJ_HASH: return REDISMODULE_KEYTYPE_HASH;
|
|
default: return 0;
|
|
}
|
|
}
|
|
|
|
/* Return the length of the value associated with the key.
|
|
* For strings this is the length of the string. For all the other types
|
|
* is the number of elements (just counting keys for hashes).
|
|
*
|
|
* If the key pointer is NULL or the key is empty, zero is returned. */
|
|
size_t RM_ValueLength(RedisModuleKey *key) {
|
|
if (key == NULL || key->value == NULL) return 0;
|
|
switch(key->value->type) {
|
|
case OBJ_STRING: return stringObjectLen(key->value);
|
|
case OBJ_LIST: return listTypeLength(key->value);
|
|
case OBJ_SET: return setTypeSize(key->value);
|
|
case OBJ_ZSET: return zsetLength(key->value);
|
|
case OBJ_HASH: return hashTypeLength(key->value);
|
|
default: return 0;
|
|
}
|
|
}
|
|
|
|
/* If the key is open for writing, remove it, and setup the key to
|
|
* accept new writes as an empty key (that will be created on demand).
|
|
* On success REDISMODULE_OK is returned. If the key is not open for
|
|
* writing REDISMODULE_ERR is returned. */
|
|
int RM_DeleteKey(RedisModuleKey *key) {
|
|
if (!(key->mode & REDISMODULE_WRITE)) return REDISMODULE_ERR;
|
|
if (key->value) {
|
|
dbDelete(key->db,key->key);
|
|
key->value = NULL;
|
|
}
|
|
return REDISMODULE_OK;
|
|
}
|
|
|
|
/* --------------------------------------------------------------------------
|
|
* Key API for String type
|
|
* -------------------------------------------------------------------------- */
|
|
|
|
/* If the key is open for writing, set the specified string 'str' as the
|
|
* value of the key, deleting the old value if any.
|
|
* On success REDISMODULE_OK is returned. If the key is not open for
|
|
* writing or there is an active iterator, REDISMODULE_ERR is returned. */
|
|
int RM_StringSet(RedisModuleKey *key, RedisModuleString *str) {
|
|
if (!(key->mode & REDISMODULE_WRITE) || key->iter) return REDISMODULE_ERR;
|
|
RM_DeleteKey(key);
|
|
setKey(key->db,key->key,str);
|
|
return REDISMODULE_OK;
|
|
}
|
|
|
|
/* Prepare the key associated string value for DMA access, and returns
|
|
* a pointer and size (by reference), that the user can use to read or
|
|
* modify the string in-place accessing it directly via pointer.
|
|
*
|
|
* The 'mode' is composed by bitwise OR-ing the following flags:
|
|
*
|
|
* REDISMODULE_READ -- Read access
|
|
* REDISMODULE_WRITE -- WRite access
|
|
*
|
|
* If the DMA is not requested for writing, the pointer returned should
|
|
* only be accessed in a read-only fashion.
|
|
*
|
|
* On error (wrong type) NULL is returned.
|
|
*
|
|
* DMA access rules:
|
|
*
|
|
* 1. No other key writing function should be called since the moment
|
|
* the pointer is obtained, for all the time we want to use DMA access
|
|
* to read or modify the string.
|
|
*
|
|
* 2. Each time RM_StringTruncate() is called, to continue with the DMA
|
|
* access, RM_StringDMA() should be called again to re-obtain
|
|
* a new pointer and length.
|
|
*
|
|
* 3. If the returned pointer is not NULL, but the length is zero, no
|
|
* byte can be touched (the string is empty, or the key itself is empty)
|
|
* so a RM_StringTruncate() call should be used if there is to enlarge
|
|
* the string, and later call StringDMA() again to get the pointer.
|
|
*/
|
|
char *RM_StringDMA(RedisModuleKey *key, size_t *len, int mode) {
|
|
/* We need to return *some* pointer for empty keys, we just return
|
|
* a string literal pointer, that is the advantage to be mapped into
|
|
* a read only memory page, so the module will segfault if a write
|
|
* attempt is performed. */
|
|
char *emptystring = "<dma-empty-string>";
|
|
if (key->value == NULL) {
|
|
*len = 0;
|
|
return emptystring;
|
|
}
|
|
|
|
if (key->value->type != OBJ_STRING) return NULL;
|
|
|
|
/* For write access, and even for read access if the object is encoded,
|
|
* we unshare the string (that has the side effect of decoding it). */
|
|
if ((mode & REDISMODULE_WRITE) || key->value->encoding != OBJ_ENCODING_RAW)
|
|
key->value = dbUnshareStringValue(key->db, key->key, key->value);
|
|
|
|
*len = sdslen(key->value->ptr);
|
|
return key->value->ptr;
|
|
}
|
|
|
|
/* If the string is open for writing and is of string type, resize it, padding
|
|
* with zero bytes if the new length is greater than the old one.
|
|
*
|
|
* After this call, RM_StringDMA() must be called again to continue
|
|
* DMA access with the new pointer.
|
|
*
|
|
* The function returns REDISMODULE_OK on success, and REDISMODULE_ERR on
|
|
* error, that is, the key is not open for writing, is not a string
|
|
* or resizing for more than 512 MB is requested.
|
|
*
|
|
* If the key is empty, a string key is created with the new string value
|
|
* unless the new length value requested is zero. */
|
|
int RM_StringTruncate(RedisModuleKey *key, size_t newlen) {
|
|
if (!(key->mode & REDISMODULE_WRITE)) return REDISMODULE_ERR;
|
|
if (key->value && key->value->type != OBJ_STRING) return REDISMODULE_ERR;
|
|
if (newlen > 512*1024*1024) return REDISMODULE_ERR;
|
|
|
|
/* Empty key and new len set to 0. Just return REDISMODULE_OK without
|
|
* doing anything. */
|
|
if (key->value == NULL && newlen == 0) return REDISMODULE_OK;
|
|
|
|
/* Empty key: fill it with a zero-length key so that we can handle the
|
|
* resize with a common code path. */
|
|
if (key->value == NULL) {
|
|
robj *emptyobj = createStringObject("",0);
|
|
setKey(key->db,key->key,emptyobj);
|
|
key->value = emptyobj;
|
|
decrRefCount(emptyobj);
|
|
}
|
|
|
|
/* Unshare and resize. */
|
|
key->value = dbUnshareStringValue(key->db, key->key, key->value);
|
|
size_t curlen = sdslen(key->value->ptr);
|
|
if (newlen > curlen) {
|
|
key->value->ptr = sdsgrowzero(key->value->ptr,newlen);
|
|
} else if (newlen < curlen) {
|
|
sdsrange(key->value->ptr,0,newlen-1);
|
|
/* If the string is too wasteful, reallocate it. */
|
|
if (sdslen(key->value->ptr) > sdsavail(key->value->ptr))
|
|
key->value->ptr = sdsRemoveFreeSpace(key->value->ptr);
|
|
}
|
|
return REDISMODULE_OK;
|
|
}
|
|
|
|
/* --------------------------------------------------------------------------
|
|
* Key API for List type
|
|
* -------------------------------------------------------------------------- */
|
|
|
|
/* Push an element into a list, on head or tail depending on 'where' argumnet.
|
|
* If the key pointer is about an empty key opened for writing, the key
|
|
* is created. On error (key opened for read-only operations or of the wrong
|
|
* type) REDISMODULE_ERR is returned, otherwise REDISMODULE_OK is returned. */
|
|
int RM_ListPush(RedisModuleKey *key, int where, RedisModuleString *ele) {
|
|
if (!(key->mode & REDISMODULE_WRITE)) return REDISMODULE_ERR;
|
|
if (key->value == NULL) moduleCreateEmtpyKey(key,REDISMODULE_KEYTYPE_LIST);
|
|
if (key->value->type != OBJ_LIST) return REDISMODULE_ERR;
|
|
listTypePush(key->value, ele,
|
|
(where == REDISMODULE_LIST_HEAD) ? QUICKLIST_HEAD : QUICKLIST_TAIL);
|
|
signalModifiedKey(key->db,key->key);
|
|
return REDISMODULE_OK;
|
|
}
|
|
|
|
/* Pop an element from the list, and returns it as a module string object
|
|
* that the user should be free with RM_FreeString() or by enabling
|
|
* automatic memory. 'where' specifies if the element should be popped from
|
|
* head or tail. The command returns NULL if:
|
|
* 1) The list is empty.
|
|
* 2) The key was not open for writing.
|
|
* 3) The key is not a list. */
|
|
RedisModuleString *RM_ListPop(RedisModuleKey *key, int where) {
|
|
if (!(key->mode & REDISMODULE_WRITE) ||
|
|
key->value == NULL ||
|
|
key->value->type != OBJ_LIST) return NULL;
|
|
robj *ele = listTypePop(key->value,
|
|
(where == REDISMODULE_LIST_HEAD) ? QUICKLIST_HEAD : QUICKLIST_TAIL);
|
|
robj *decoded = getDecodedObject(ele);
|
|
decrRefCount(ele);
|
|
moduleDelKeyIfEmpty(key);
|
|
RM_AutoMemoryAdd(key->ctx,REDISMODULE_AM_STRING,decoded);
|
|
return decoded;
|
|
}
|
|
|
|
/* --------------------------------------------------------------------------
|
|
* Redis <-> Modules generic Call() API
|
|
* -------------------------------------------------------------------------- */
|
|
|
|
/* Create a new RedisModuleCallReply object. The processing of the reply
|
|
* is lazy, the object is just populated with the raw protocol and later
|
|
* is processed as needed. Initially we just make sure to set the right
|
|
* reply type, which is extremely cheap to do. */
|
|
RedisModuleCallReply *moduleCreateCallReplyFromProto(RedisModuleCtx *ctx, sds proto) {
|
|
RedisModuleCallReply *reply = zmalloc(sizeof(*reply));
|
|
reply->ctx = ctx;
|
|
reply->proto = proto;
|
|
reply->protolen = sdslen(proto);
|
|
reply->flags = REDISMODULE_REPLYFLAG_TOPARSE; /* Lazy parsing. */
|
|
switch(proto[0]) {
|
|
case '$':
|
|
case '+': reply->type = REDISMODULE_REPLY_STRING;
|
|
case '-': reply->type = REDISMODULE_REPLY_ERROR;
|
|
case ':': reply->type = REDISMODULE_REPLY_INTEGER;
|
|
case '*': reply->type = REDISMODULE_REPLY_ARRAY;
|
|
default: reply->type = REDISMODULE_REPLY_UNKNOWN;
|
|
}
|
|
if ((proto[0] == '*' || proto[0] == '$') && proto[1] == '-')
|
|
reply->type = REDISMODULE_REPLY_NULL;
|
|
return reply;
|
|
}
|
|
|
|
void moduleParseCallReply_Int(RedisModuleCallReply *reply);
|
|
void moduleParseCallReply_BulkString(RedisModuleCallReply *reply);
|
|
void moduleParseCallReply_SimpleString(RedisModuleCallReply *reply);
|
|
void moduleParseCallReply_Array(RedisModuleCallReply *reply);
|
|
|
|
/* Do nothing if REDISMODULE_REPLYFLAG_TOPARSE is false, otherwise
|
|
* use the protcol of the reply in reply->proto in order to fill the
|
|
* reply with parsed data according to the reply type. */
|
|
void moduleParseCallReply(RedisModuleCallReply *reply) {
|
|
if (!(reply->flags & REDISMODULE_REPLYFLAG_TOPARSE)) return;
|
|
reply->flags &= ~REDISMODULE_REPLYFLAG_TOPARSE;
|
|
|
|
switch(reply->proto[0]) {
|
|
case ':': moduleParseCallReply_Int(reply); break;
|
|
case '$': moduleParseCallReply_BulkString(reply); break;
|
|
case '-': /* handled by next item. */
|
|
case '+': moduleParseCallReply_SimpleString(reply); break;
|
|
case '*': moduleParseCallReply_Array(reply); break;
|
|
}
|
|
}
|
|
|
|
void moduleParseCallReply_Int(RedisModuleCallReply *reply) {
|
|
char *proto = reply->proto;
|
|
char *p = strchr(proto+1,'\r');
|
|
|
|
string2ll(proto+1,p-proto-1,&reply->val.ll);
|
|
reply->protolen = p-proto+2;
|
|
reply->type = REDISMODULE_REPLY_INTEGER;
|
|
}
|
|
|
|
void moduleParseCallReply_BulkString(RedisModuleCallReply *reply) {
|
|
char *proto = reply->proto;
|
|
char *p = strchr(proto+1,'\r');
|
|
long long bulklen;
|
|
|
|
string2ll(proto+1,p-proto-1,&bulklen);
|
|
if (bulklen == -1) {
|
|
reply->protolen = proto-p+2;
|
|
reply->type = REDISMODULE_REPLY_NULL;
|
|
} else {
|
|
reply->val.str = p+2;
|
|
reply->len = bulklen;
|
|
reply->protolen = p-proto+2+bulklen+2;
|
|
reply->type = REDISMODULE_REPLY_STRING;
|
|
}
|
|
}
|
|
|
|
void moduleParseCallReply_SimpleString(RedisModuleCallReply *reply) {
|
|
char *proto = reply->proto;
|
|
char *p = strchr(proto+1,'\r');
|
|
|
|
reply->val.str = proto+1;
|
|
reply->len = p-proto-1;
|
|
reply->protolen = proto-p+2;
|
|
reply->type = proto[0] == '+' ? REDISMODULE_REPLY_STRING :
|
|
REDISMODULE_REPLY_ERROR;
|
|
}
|
|
|
|
void moduleParseCallReply_Array(RedisModuleCallReply *reply) {
|
|
char *proto = reply->proto;
|
|
char *p = strchr(proto+1,'\r');
|
|
long long arraylen, j;
|
|
|
|
string2ll(proto+1,p-proto-1,&arraylen);
|
|
p += 2;
|
|
|
|
if (arraylen == -1) {
|
|
reply->protolen = proto-p;
|
|
reply->type = REDISMODULE_REPLY_NULL;
|
|
return;
|
|
}
|
|
|
|
reply->val.array = zmalloc(sizeof(RedisModuleCallReply)*arraylen);
|
|
reply->len = arraylen;
|
|
for (j = 0; j < arraylen; j++) {
|
|
RedisModuleCallReply *ele = reply->val.array+j;
|
|
ele->flags = REDISMODULE_REPLYFLAG_NESTED |
|
|
REDISMODULE_REPLYFLAG_TOPARSE;
|
|
ele->proto = p;
|
|
moduleParseCallReply(ele);
|
|
p += ele->protolen;
|
|
}
|
|
reply->protolen = proto-p;
|
|
reply->type = REDISMODULE_REPLY_ARRAY;
|
|
}
|
|
|
|
/* Free a Call reply and all the nested replies it contains if it's an
|
|
* array. */
|
|
void RM_FreeCallReply_Rec(RedisModuleCallReply *reply, int freenested){
|
|
/* Don't free nested replies by default: the user must always free the
|
|
* toplevel reply. However be gentle and don't crash if the module
|
|
* misuses the API. */
|
|
if (!freenested && reply->flags & REDISMODULE_REPLYFLAG_NESTED) return;
|
|
|
|
if (!(reply->flags & REDISMODULE_REPLYFLAG_TOPARSE)) {
|
|
if (reply->type == REDISMODULE_REPLY_ARRAY) {
|
|
size_t j;
|
|
for (j = 0; j < reply->len; j++)
|
|
RM_FreeCallReply_Rec(reply->val.array+j,1);
|
|
zfree(reply->val.array);
|
|
}
|
|
}
|
|
|
|
/* For nested replies, we don't free reply->proto (which if not NULL
|
|
* references the parent reply->proto buffer), nor the structure
|
|
* itself which is allocated as an array of structures, and is freed
|
|
* when the array value is released. */
|
|
if (!(reply->flags & REDISMODULE_REPLYFLAG_NESTED)) {
|
|
if (reply->proto) sdsfree(reply->proto);
|
|
zfree(reply);
|
|
}
|
|
}
|
|
|
|
/* Wrapper for the recursive free reply function. This is needed in order
|
|
* to have the first level function to return on nested replies, but only
|
|
* if called by the module API. */
|
|
void RM_FreeCallReply(RedisModuleCallReply *reply) {
|
|
RM_FreeCallReply_Rec(reply,0);
|
|
RM_AutoMemoryFreed(reply->ctx,REDISMODULE_AM_REPLY,reply);
|
|
}
|
|
|
|
/* Return the reply type. */
|
|
int RM_CallReplyType(RedisModuleCallReply *reply) {
|
|
return reply->type;
|
|
}
|
|
|
|
/* Return the reply type length, where applicable. */
|
|
size_t RM_CallReplyLength(RedisModuleCallReply *reply) {
|
|
moduleParseCallReply(reply);
|
|
switch(reply->type) {
|
|
case REDISMODULE_REPLY_STRING:
|
|
case REDISMODULE_REPLY_ERROR:
|
|
case REDISMODULE_REPLY_ARRAY:
|
|
return reply->len;
|
|
default:
|
|
return 0;
|
|
}
|
|
}
|
|
|
|
/* Return the 'idx'-th nested call reply element of an array reply, or NULL
|
|
* if the reply type is wrong or the index is out of range. */
|
|
RedisModuleCallReply *RM_CallReplyArrayElement(RedisModuleCallReply *reply, size_t idx) {
|
|
moduleParseCallReply(reply);
|
|
if (reply->type != REDISMODULE_REPLY_ARRAY) return NULL;
|
|
if (idx >= reply->len) return NULL;
|
|
return reply->val.array+idx;
|
|
}
|
|
|
|
/* Return the long long of an integer reply. */
|
|
long long RM_CallReplyInteger(RedisModuleCallReply *reply) {
|
|
moduleParseCallReply(reply);
|
|
if (reply->type != REDISMODULE_REPLY_INTEGER) return LLONG_MIN;
|
|
return reply->val.ll;
|
|
}
|
|
|
|
/* Return the pointer and length of a string or error reply. */
|
|
const char *RM_CallReplyStringPtr(RedisModuleCallReply *reply, size_t *len) {
|
|
moduleParseCallReply(reply);
|
|
if (reply->type != REDISMODULE_REPLY_STRING &&
|
|
reply->type != REDISMODULE_REPLY_ERROR) return NULL;
|
|
if (len) *len = reply->len;
|
|
return reply->val.str;
|
|
}
|
|
|
|
/* Return a new string object from a call reply of type string, error or
|
|
* integer. Otherwise (wrong reply type) return NULL. */
|
|
RedisModuleString *RM_CreateStringFromCallReply(RedisModuleCallReply *reply) {
|
|
moduleParseCallReply(reply);
|
|
switch(reply->type) {
|
|
case REDISMODULE_REPLY_STRING:
|
|
case REDISMODULE_REPLY_ERROR:
|
|
return RM_CreateString(reply->ctx,reply->val.str,reply->len);
|
|
case REDISMODULE_REPLY_INTEGER: {
|
|
char buf[64];
|
|
int len = ll2string(buf,sizeof(buf),reply->val.ll);
|
|
return RM_CreateString(reply->ctx,buf,len);
|
|
}
|
|
default: return NULL;
|
|
}
|
|
}
|
|
|
|
/* Returns an array of robj pointers, and populates *argc with the number
|
|
* of items, by parsing the format specifier "fmt" as described for
|
|
* the RM_Call(), RM_Replicate() and other module APIs.
|
|
*
|
|
* The integer pointed by 'flags' is populated with flags according
|
|
* to special modifiers in "fmt". For now only one exists:
|
|
*
|
|
* "!" -> REDISMODULE_ARGV_REPLICATE
|
|
*
|
|
* On error (format specifier error) NULL is returned and nothing is
|
|
* allocated. On success the argument vector is returned. */
|
|
|
|
#define REDISMODULE_ARGV_REPLICATE (1<<0)
|
|
|
|
robj **moduleCreateArgvFromUserFormat(const char *cmdname, const char *fmt, int *argcp, int *flags, va_list ap) {
|
|
int argc = 0, argv_size, j;
|
|
robj **argv = NULL;
|
|
|
|
/* As a first guess to avoid useless reallocations, size argv to
|
|
* hold one argument for each char specifier in 'fmt'. */
|
|
argv_size = strlen(fmt)+1; /* +1 because of the command name. */
|
|
argv = zrealloc(argv,sizeof(robj*)*argv_size);
|
|
|
|
/* Build the arguments vector based on the format specifier. */
|
|
argv[0] = createStringObject(cmdname,strlen(cmdname));
|
|
argc++;
|
|
|
|
/* Create the client and dispatch the command. */
|
|
const char *p = fmt;
|
|
while(*p) {
|
|
if (*p == 'c') {
|
|
char *cstr = va_arg(ap,char*);
|
|
argv[argc++] = createStringObject(cstr,strlen(cstr));
|
|
} else if (*p == 's') {
|
|
robj *obj = va_arg(ap,void*);
|
|
argv[argc++] = obj;
|
|
incrRefCount(obj);
|
|
} else if (*p == 'b') {
|
|
char *buf = va_arg(ap,char*);
|
|
size_t len = va_arg(ap,size_t);
|
|
argv[argc++] = createStringObject(buf,len);
|
|
} else if (*p == 'l') {
|
|
long ll = va_arg(ap,long long);
|
|
argv[argc++] = createStringObjectFromLongLong(ll);
|
|
} else if (*p == 'v') {
|
|
/* TODO: work in progress. */
|
|
} else if (*p == '!') {
|
|
if (flags) (*flags) |= REDISMODULE_ARGV_REPLICATE;
|
|
} else {
|
|
goto fmterr;
|
|
}
|
|
p++;
|
|
}
|
|
*argcp = argc;
|
|
return argv;
|
|
|
|
fmterr:
|
|
for (j = 0; j < argc; j++)
|
|
decrRefCount(argv[j]);
|
|
zfree(argv);
|
|
return NULL;
|
|
}
|
|
|
|
/* Exported API to call any Redis command from modules.
|
|
* On success a RedisModuleCallReply object is returned, otherwise
|
|
* NULL is returned and errno is set to the following values:
|
|
*
|
|
* EINVAL: command non existing, wrong arity, wrong format specifier.
|
|
* EPERM: operation in Cluster instance with key in non local slot. */
|
|
RedisModuleCallReply *RM_Call(RedisModuleCtx *ctx, const char *cmdname, const char *fmt, ...) {
|
|
struct redisCommand *cmd;
|
|
client *c = NULL;
|
|
robj **argv = NULL;
|
|
int argc = 0, flags = 0;
|
|
va_list ap;
|
|
RedisModuleCallReply *reply = NULL;
|
|
int replicate = 0; /* Replicate this command? */
|
|
|
|
cmd = lookupCommandByCString((char*)cmdname);
|
|
if (!cmd) {
|
|
errno = EINVAL;
|
|
return NULL;
|
|
}
|
|
|
|
/* Create the client and dispatch the command. */
|
|
va_start(ap, fmt);
|
|
c = createClient(-1);
|
|
argv = moduleCreateArgvFromUserFormat(cmdname,fmt,&argc,&flags,ap);
|
|
replicate = flags & REDISMODULE_ARGV_REPLICATE;
|
|
va_end(ap);
|
|
|
|
/* Setup our fake client for command execution. */
|
|
c->flags |= CLIENT_MODULE;
|
|
c->argv = argv;
|
|
c->argc = argc;
|
|
c->cmd = c->lastcmd = cmd;
|
|
/* We handle the above format error only when the client is setup so that
|
|
* we can free it normally. */
|
|
if (argv == NULL) goto cleanup;
|
|
|
|
/* Basic arity checks. */
|
|
if ((cmd->arity > 0 && cmd->arity != argc) || (argc < -cmd->arity)) {
|
|
errno = EINVAL;
|
|
goto cleanup;
|
|
}
|
|
|
|
/* If this is a Redis Cluster node, we need to make sure the module is not
|
|
* trying to access non-local keys, with the exception of commands
|
|
* received from our master. */
|
|
if (server.cluster_enabled && !(ctx->client->flags & CLIENT_MASTER)) {
|
|
/* Duplicate relevant flags in the module client. */
|
|
c->flags &= ~(CLIENT_READONLY|CLIENT_ASKING);
|
|
c->flags |= ctx->client->flags & (CLIENT_READONLY|CLIENT_ASKING);
|
|
if (getNodeByQuery(c,c->cmd,c->argv,c->argc,NULL,NULL) !=
|
|
server.cluster->myself)
|
|
{
|
|
errno = EPERM;
|
|
goto cleanup;
|
|
}
|
|
}
|
|
|
|
/* If we are using single commands replication, we need to wrap what
|
|
* we propagate into a MULTI/EXEC block, so that it will be atomic like
|
|
* a Lua script in the context of AOF and slaves. */
|
|
if (replicate) moduleReplicateMultiIfNeeded(ctx);
|
|
|
|
/* Run the command */
|
|
int call_flags = CMD_CALL_SLOWLOG | CMD_CALL_STATS;
|
|
if (replicate) {
|
|
call_flags |= CMD_CALL_PROPAGATE_AOF;
|
|
call_flags |= CMD_CALL_PROPAGATE_REPL;
|
|
}
|
|
call(c,call_flags);
|
|
|
|
/* Convert the result of the Redis command into a suitable Lua type.
|
|
* The first thing we need is to create a single string from the client
|
|
* output buffers. */
|
|
sds proto = sdsnewlen(c->buf,c->bufpos);
|
|
c->bufpos = 0;
|
|
while(listLength(c->reply)) {
|
|
sds o = listNodeValue(listFirst(c->reply));
|
|
|
|
proto = sdscatsds(proto,o);
|
|
listDelNode(c->reply,listFirst(c->reply));
|
|
}
|
|
reply = moduleCreateCallReplyFromProto(ctx,proto);
|
|
RM_AutoMemoryAdd(ctx,REDISMODULE_AM_REPLY,reply);
|
|
|
|
cleanup:
|
|
freeClient(c);
|
|
return reply;
|
|
}
|
|
|
|
/* Return a pointer, and a length, to the protocol returned by the command
|
|
* that returned the reply object. */
|
|
const char *RM_CallReplyProto(RedisModuleCallReply *reply, size_t *len) {
|
|
if (reply->proto) *len = sdslen(reply->proto);
|
|
return reply->proto;
|
|
}
|
|
|
|
/* --------------------------------------------------------------------------
|
|
* Modules API internals
|
|
* -------------------------------------------------------------------------- */
|
|
|
|
/* server.moduleapi dictionary type. Only uses plain C strings since
|
|
* this gets queries from modules. */
|
|
|
|
unsigned int dictCStringKeyHash(const void *key) {
|
|
return dictGenHashFunction((unsigned char*)key, strlen((char*)key));
|
|
}
|
|
|
|
int dictCStringKeyCompare(void *privdata, const void *key1, const void *key2) {
|
|
DICT_NOTUSED(privdata);
|
|
return strcmp(key1,key2) == 0;
|
|
}
|
|
|
|
dictType moduleAPIDictType = {
|
|
dictCStringKeyHash, /* hash function */
|
|
NULL, /* key dup */
|
|
NULL, /* val dup */
|
|
dictCStringKeyCompare, /* key compare */
|
|
NULL, /* key destructor */
|
|
NULL /* val destructor */
|
|
};
|
|
|
|
int moduleRegisterApi(const char *funcname, void *funcptr) {
|
|
return dictAdd(server.moduleapi, (char*)funcname, funcptr);
|
|
}
|
|
|
|
#define REGISTER_API(name) \
|
|
moduleRegisterApi("RedisModule_" #name, (void *)(unsigned long)RM_ ## name)
|
|
|
|
/* Register all the APIs we export. */
|
|
void moduleRegisterCoreAPI(void) {
|
|
server.moduleapi = dictCreate(&moduleAPIDictType,NULL);
|
|
REGISTER_API(CreateCommand);
|
|
REGISTER_API(SetModuleAttribs);
|
|
REGISTER_API(WrongArity);
|
|
REGISTER_API(ReplyWithLongLong);
|
|
REGISTER_API(ReplyWithError);
|
|
REGISTER_API(ReplyWithSimpleString);
|
|
REGISTER_API(ReplyWithArray);
|
|
REGISTER_API(ReplyWithString);
|
|
REGISTER_API(ReplyWithStringBuffer);
|
|
REGISTER_API(GetSelectedDb);
|
|
REGISTER_API(SelectDb);
|
|
REGISTER_API(OpenKey);
|
|
REGISTER_API(CloseKey);
|
|
REGISTER_API(KeyType);
|
|
REGISTER_API(ValueLength);
|
|
REGISTER_API(ListPush);
|
|
REGISTER_API(ListPop);
|
|
REGISTER_API(StringToLongLong);
|
|
REGISTER_API(Call);
|
|
REGISTER_API(CallReplyProto);
|
|
REGISTER_API(FreeCallReply);
|
|
REGISTER_API(CallReplyInteger);
|
|
REGISTER_API(CallReplyType);
|
|
REGISTER_API(CallReplyLength);
|
|
REGISTER_API(CallReplyArrayElement);
|
|
REGISTER_API(CallReplyStringPtr);
|
|
REGISTER_API(CreateStringFromCallReply);
|
|
REGISTER_API(CreateString);
|
|
REGISTER_API(CreateStringFromLongLong);
|
|
REGISTER_API(FreeString);
|
|
REGISTER_API(StringPtrLen);
|
|
REGISTER_API(AutoMemory);
|
|
REGISTER_API(Replicate);
|
|
REGISTER_API(ReplicateVerbatim);
|
|
REGISTER_API(DeleteKey);
|
|
REGISTER_API(StringSet);
|
|
REGISTER_API(StringDMA);
|
|
REGISTER_API(StringTruncate);
|
|
}
|
|
|
|
/* Global initialization at Redis startup. */
|
|
void moduleInitModulesSystem(void) {
|
|
server.loadmodule_queue = listCreate();
|
|
modules = dictCreate(&modulesDictType,NULL);
|
|
moduleRegisterCoreAPI();
|
|
}
|
|
|
|
/* Load all the modules in the server.loadmodule_queue list, which is
|
|
* populated by `loadmodule` directives in the configuration file.
|
|
* We can't load modules directly when processing the configuration file
|
|
* because the server must be fully initialized before loading modules.
|
|
*
|
|
* The function aborts the server on errors, since to start with missing
|
|
* modules is not considered sane: clients may rely on the existance of
|
|
* given commands, loading AOF also may need some modules to exist, and
|
|
* if this instance is a slave, it must understand commands from master. */
|
|
void moduleLoadFromQueue(void) {
|
|
listIter li;
|
|
listNode *ln;
|
|
|
|
listRewind(server.loadmodule_queue,&li);
|
|
while((ln = listNext(&li))) {
|
|
sds modulepath = ln->value;
|
|
if (moduleLoad(modulepath) == C_ERR) {
|
|
serverLog(LL_WARNING,
|
|
"Can't load module from %s: server aborting",
|
|
modulepath);
|
|
exit(1);
|
|
}
|
|
}
|
|
}
|
|
|
|
void moduleFreeModuleStructure(struct RedisModule *module) {
|
|
sdsfree(module->name);
|
|
zfree(module);
|
|
}
|
|
|
|
/* Load a module and initialize it. On success C_OK is returned, otherwise
|
|
* C_ERR is returned. */
|
|
int moduleLoad(const char *path) {
|
|
int (*onload)(void *);
|
|
void *handle;
|
|
RedisModuleCtx ctx = REDISMODULE_CTX_INIT;
|
|
|
|
handle = dlopen(path,RTLD_NOW|RTLD_LOCAL);
|
|
if (handle == NULL) return C_ERR;
|
|
onload = (int (*)(void *))(unsigned long) dlsym(handle,"RedisModule_OnLoad");
|
|
if (onload == NULL) {
|
|
serverLog(LL_WARNING,
|
|
"Module %s does not export RedisModule_OnLoad() "
|
|
"symbol. Module not loaded.",path);
|
|
return C_ERR;
|
|
}
|
|
if (onload((void*)&ctx) == REDISMODULE_ERR) {
|
|
if (ctx.module) moduleFreeModuleStructure(ctx.module);
|
|
dlclose(handle);
|
|
serverLog(LL_WARNING,
|
|
"Module %s initialization failed. Module not loaded",path);
|
|
return C_ERR;
|
|
}
|
|
|
|
/* Redis module loaded! Register it. */
|
|
dictAdd(modules,ctx.module->name,ctx.module);
|
|
ctx.module->handle = handle;
|
|
serverLog(LL_NOTICE,"Module '%s' loaded from %s",ctx.module->name,path);
|
|
return C_OK;
|
|
}
|
|
|
|
/* Unload the module registered with the specified name. On success
|
|
* C_OK is returned, otherwise C_ERR is returned and errno is set
|
|
* to the following values depending on the type of error:
|
|
*
|
|
* ENONET: No such module having the specified name. */
|
|
int moduleUnload(sds name) {
|
|
struct RedisModule *module = dictFetchValue(modules,name);
|
|
if (module == NULL) {
|
|
errno = ENOENT;
|
|
return REDISMODULE_ERR;
|
|
}
|
|
|
|
/* Unregister all the commands registered by this module. */
|
|
dictIterator *di = dictGetSafeIterator(server.commands);
|
|
dictEntry *de;
|
|
while ((de = dictNext(di)) != NULL) {
|
|
struct redisCommand *cmd = dictGetVal(de);
|
|
if (cmd->proc == RedisModuleCommandDispatcher) {
|
|
RedisModuleCommandProxy *cp =
|
|
(void*)(unsigned long)cmd->getkeys_proc;
|
|
sds cmdname = cp->rediscmd->name;
|
|
if (cp->module == module) {
|
|
dictDelete(server.commands,cmdname);
|
|
dictDelete(server.orig_commands,cmdname);
|
|
sdsfree(cmdname);
|
|
zfree(cp->rediscmd);
|
|
zfree(cp);
|
|
}
|
|
}
|
|
}
|
|
dictReleaseIterator(di);
|
|
|
|
/* Unregister all the hooks. TODO: Yet no hooks support here. */
|
|
|
|
/* Unload the dynamic library. */
|
|
if (dlclose(module->handle) == -1) {
|
|
char *error = dlerror();
|
|
if (error == NULL) error = "Unknown error";
|
|
serverLog(LL_WARNING,"Error when trying to close the %s module: %s",
|
|
module->name, error);
|
|
}
|
|
|
|
/* Remove from list of modules. */
|
|
serverLog(LL_NOTICE,"Module %s unloaded",module->name);
|
|
dictDelete(modules,module->name);
|
|
|
|
/* Free the module structure. */
|
|
zfree(module);
|
|
|
|
return REDISMODULE_OK;
|
|
}
|
|
|
|
/* Redis MODULE command.
|
|
*
|
|
* MODULE LOAD <path> */
|
|
void moduleCommand(client *c) {
|
|
char *subcmd = c->argv[1]->ptr;
|
|
|
|
if (!strcasecmp(subcmd,"load") && c->argc == 3) {
|
|
if (moduleLoad(c->argv[2]->ptr) == C_OK)
|
|
addReply(c,shared.ok);
|
|
else
|
|
addReplyError(c,
|
|
"Error loading the extension. Please check the server logs.");
|
|
} else if (!strcasecmp(subcmd,"unload") && c->argc == 3) {
|
|
if (moduleUnload(c->argv[2]->ptr) == C_OK)
|
|
addReply(c,shared.ok);
|
|
else {
|
|
char *errmsg = "operation not possible.";
|
|
switch(errno) {
|
|
case ENOENT: errmsg = "no such module with that name";
|
|
}
|
|
addReplyErrorFormat(c,"Error unloading module: %s",errmsg);
|
|
}
|
|
} else if (!strcasecmp(subcmd,"list") && c->argc == 2) {
|
|
dictIterator *di = dictGetIterator(modules);
|
|
dictEntry *de;
|
|
|
|
addReplyMultiBulkLen(c,dictSize(modules));
|
|
while ((de = dictNext(di)) != NULL) {
|
|
sds name = dictGetKey(de);
|
|
struct RedisModule *module = dictGetVal(de);
|
|
addReplyMultiBulkLen(c,4);
|
|
addReplyBulkCString(c,"name");
|
|
addReplyBulkCBuffer(c,name,sdslen(name));
|
|
addReplyBulkCString(c,"ver");
|
|
addReplyLongLong(c,module->ver);
|
|
}
|
|
dictReleaseIterator(di);
|
|
} else {
|
|
addReply(c,shared.syntaxerr);
|
|
}
|
|
}
|