Refactor: move all valkey modules related declarations to module.h (#1489)

In this commit we move all structures and functions declarations related
to Valkey modules from `server.h` to the recently added `module.h` file.

This re-organization makes it easier for new contributors to find the
valkey modules related code, as well as reducing the compilation times
when changes are made to the modules code.

---------

Signed-off-by: Ricardo Dias <ricardo.dias@percona.com>
This commit is contained in:
Ricardo Dias 2025-01-02 17:35:10 +00:00 committed by GitHub
parent ede4adde7a
commit 8d764f27b3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
22 changed files with 283 additions and 252 deletions

View File

@ -29,6 +29,7 @@
#include "server.h"
#include "sha256.h"
#include "module.h"
#include <fcntl.h>
#include <ctype.h>

View File

@ -31,6 +31,7 @@
#include "bio.h"
#include "rio.h"
#include "functions.h"
#include "module.h"
#include <signal.h>
#include <fcntl.h>
@ -2161,7 +2162,7 @@ int rewriteModuleObject(rio *r, robj *key, robj *o, int dbid) {
ValkeyModuleIO io;
moduleValue *mv = o->ptr;
moduleType *mt = mv->type;
moduleInitIOContext(io, mt, r, key, dbid);
moduleInitIOContext(&io, mt, r, key, dbid);
mt->aof_rewrite(&io, key, mv->value);
if (io.ctx) {
moduleFreeContext(io.ctx);

View File

@ -65,6 +65,7 @@
#include "latency.h"
#include "monotonic.h"
#include "cluster_slot_stats.h"
#include "module.h"
/* forward declarations */
static void unblockClientWaitingData(client *c);

View File

@ -36,6 +36,7 @@
#include "server.h"
#include "cluster.h"
#include "cluster_slot_stats.h"
#include "module.h"
#include <ctype.h>

View File

@ -38,6 +38,7 @@
#include "cluster_slot_stats.h"
#include "endianconv.h"
#include "connection.h"
#include "module.h"
#include <stdlib.h>
#include <sys/types.h>

View File

@ -32,6 +32,7 @@
#include "cluster.h"
#include "connection.h"
#include "bio.h"
#include "module.h"
#include <fcntl.h>
#include <sys/stat.h>
@ -371,20 +372,6 @@ void resetServerSaveParams(void) {
server.saveparamslen = 0;
}
void queueLoadModule(sds path, sds *argv, int argc) {
int i;
struct moduleLoadQueueEntry *loadmod;
loadmod = zmalloc(sizeof(struct moduleLoadQueueEntry));
loadmod->argv = argc ? zmalloc(sizeof(robj *) * argc) : NULL;
loadmod->path = sdsnew(path);
loadmod->argc = argc;
for (i = 0; i < argc; i++) {
loadmod->argv[i] = createRawStringObject(argv[i], sdslen(argv[i]));
}
listAddNodeTail(server.loadmodule_queue, loadmod);
}
/* Parse an array of `arg_len` sds strings, validate and populate
* server.client_obuf_limits if valid.
* Used in CONFIG SET and configuration file parsing. */
@ -567,7 +554,7 @@ void loadServerConfigFromString(char *config) {
goto loaderr;
}
} else if (!strcasecmp(argv[0], "loadmodule") && argc >= 2) {
queueLoadModule(argv[1], &argv[2], argc - 2);
moduleEnqueueLoadModule(argv[1], &argv[2], argc - 2);
} else if (strchr(argv[0], '.')) {
if (argc < 2) {
err = "Module config specified without value";
@ -1583,12 +1570,7 @@ void rewriteConfigLoadmoduleOption(struct rewriteConfigState *state) {
dictEntry *de;
while ((de = dictNext(di)) != NULL) {
struct ValkeyModule *module = dictGetVal(de);
line = sdsnew("loadmodule ");
line = sdscatsds(line, module->loadmod->path);
for (int i = 0; i < module->loadmod->argc; i++) {
line = sdscatlen(line, " ", 1);
line = sdscatsds(line, module->loadmod->argv[i]->ptr);
}
line = moduleLoadQueueEntryToLoadmoduleOptionStr(module, "loadmodule");
rewriteConfigRewriteLine(state, "loadmodule", line, 1);
}
dictReleaseIterator(di);

View File

@ -33,6 +33,7 @@
#include "script.h"
#include "functions.h"
#include "io_threads.h"
#include "module.h"
#include <signal.h>
#include <ctype.h>

View File

@ -38,6 +38,7 @@
#include "threads_mngr.h"
#include "io_threads.h"
#include "sds.h"
#include "module.h"
#include <arpa/inet.h>
#include <signal.h>
@ -263,7 +264,7 @@ void xorObjectDigest(serverDb *db, robj *keyobj, unsigned char *digest, robj *o)
ValkeyModuleDigest md = {{0}, {0}, keyobj, db->id};
moduleValue *mv = o->ptr;
moduleType *mt = mv->type;
moduleInitDigestContext(md);
moduleInitDigestContext(&md);
if (mt->digest) {
mt->digest(&md, mv->value);
xorDigest(digest, md.x, sizeof(md.x));

View File

@ -36,6 +36,7 @@
#include "server.h"
#include "hashtable.h"
#include "script.h"
#include "module.h"
#include <stddef.h>
#ifdef HAVE_DEFRAG

View File

@ -55,6 +55,7 @@
typedef struct functionLibInfo functionLibInfo;
/* ValkeyModule type aliases for scripting engine structs and types. */
typedef struct ValkeyModule ValkeyModule;
typedef ValkeyModuleScriptingEngineCtx engineCtx;
typedef ValkeyModuleScriptingEngineFunctionCtx functionCtx;
typedef ValkeyModuleScriptingEngineCompiledFunction compiledFunction;

View File

@ -2,6 +2,7 @@
#include "bio.h"
#include "functions.h"
#include "cluster.h"
#include "module.h"
#include <stdatomic.h>

View File

@ -63,6 +63,7 @@
#include "valkeymodule.h"
#include "io_threads.h"
#include "functions.h"
#include "module.h"
#include <dlfcn.h>
#include <sys/stat.h>
#include <sys/wait.h>
@ -75,6 +76,12 @@
* pointers that have an API the module can call with them)
* -------------------------------------------------------------------------- */
struct moduleLoadQueueEntry {
sds path;
int argc;
robj **argv;
};
struct ValkeyModuleInfoCtx {
struct ValkeyModule *module;
dict *requested_sections;
@ -644,6 +651,35 @@ void *VM_PoolAlloc(ValkeyModuleCtx *ctx, size_t bytes) {
* Helpers for modules API implementation
* -------------------------------------------------------------------------- */
void moduleEnqueueLoadModule(sds path, sds *argv, int argc) {
int i;
struct moduleLoadQueueEntry *loadmod;
loadmod = zmalloc(sizeof(struct moduleLoadQueueEntry));
loadmod->argv = argc ? zmalloc(sizeof(robj *) * argc) : NULL;
loadmod->path = sdsnew(path);
loadmod->argc = argc;
for (i = 0; i < argc; i++) {
loadmod->argv[i] = createRawStringObject(argv[i], sdslen(argv[i]));
}
listAddNodeTail(server.loadmodule_queue, loadmod);
}
sds moduleLoadQueueEntryToLoadmoduleOptionStr(ValkeyModule *module,
const char *config_option_str) {
sds line;
line = sdsnew(config_option_str);
line = sdscatlen(line, " ", 1);
line = sdscatsds(line, module->loadmod->path);
for (int i = 0; i < module->loadmod->argc; i++) {
line = sdscatlen(line, " ", 1);
line = sdscatsds(line, module->loadmod->argv[i]->ptr);
}
return line;
}
client *moduleAllocTempClient(void) {
client *c = NULL;
@ -7401,7 +7437,7 @@ void *VM_LoadDataTypeFromStringEncver(const ValkeyModuleString *str, const modul
void *ret;
rioInitWithBuffer(&payload, str->ptr);
moduleInitIOContext(io, (moduleType *)mt, &payload, NULL, -1);
moduleInitIOContext(&io, (moduleType *)mt, &payload, NULL, -1);
/* All VM_Save*() calls always write a version 2 compatible format, so we
* need to make sure we read the same.
@ -7433,7 +7469,7 @@ ValkeyModuleString *VM_SaveDataTypeToString(ValkeyModuleCtx *ctx, void *data, co
ValkeyModuleIO io;
rioInitWithBuffer(&payload, sdsempty());
moduleInitIOContext(io, (moduleType *)mt, &payload, NULL, -1);
moduleInitIOContext(&io, (moduleType *)mt, &payload, NULL, -1);
mt->rdb_save(&io, data);
if (io.ctx) {
moduleFreeContext(io.ctx);

View File

@ -5,13 +5,228 @@
* not part of the module API, but are used by the core to interact with modules
*/
typedef struct ValkeyModuleCtx ValkeyModuleCtx;
typedef struct ValkeyModule ValkeyModule;
/* Extract encver / signature from a module type ID. */
#define VALKEYMODULE_TYPE_ENCVER_BITS 10
#define VALKEYMODULE_TYPE_ENCVER_MASK ((1 << VALKEYMODULE_TYPE_ENCVER_BITS) - 1)
#define VALKEYMODULE_TYPE_ENCVER(id) ((id) & VALKEYMODULE_TYPE_ENCVER_MASK)
#define VALKEYMODULE_TYPE_SIGN(id) \
(((id) & ~((uint64_t)VALKEYMODULE_TYPE_ENCVER_MASK)) >> VALKEYMODULE_TYPE_ENCVER_BITS)
/* Bit flags for moduleTypeAuxSaveFunc */
#define VALKEYMODULE_AUX_BEFORE_RDB (1 << 0)
#define VALKEYMODULE_AUX_AFTER_RDB (1 << 1)
struct ValkeyModule;
struct ValkeyModuleIO;
struct ValkeyModuleDigest;
struct ValkeyModuleCtx;
struct moduleLoadQueueEntry;
struct ValkeyModuleKeyOptCtx;
struct ValkeyModuleCommand;
struct clusterState;
/* Each module type implementation should export a set of methods in order
* to serialize and deserialize the value in the RDB file, rewrite the AOF
* log, create the digest for "DEBUG DIGEST", and free the value when a key
* is deleted. */
typedef void *(*moduleTypeLoadFunc)(struct ValkeyModuleIO *io, int encver);
typedef void (*moduleTypeSaveFunc)(struct ValkeyModuleIO *io, void *value);
typedef int (*moduleTypeAuxLoadFunc)(struct ValkeyModuleIO *rdb, int encver, int when);
typedef void (*moduleTypeAuxSaveFunc)(struct ValkeyModuleIO *rdb, int when);
typedef void (*moduleTypeRewriteFunc)(struct ValkeyModuleIO *io, struct serverObject *key, void *value);
typedef void (*moduleTypeDigestFunc)(struct ValkeyModuleDigest *digest, void *value);
typedef size_t (*moduleTypeMemUsageFunc)(const void *value);
typedef void (*moduleTypeFreeFunc)(void *value);
typedef size_t (*moduleTypeFreeEffortFunc)(struct serverObject *key, const void *value);
typedef void (*moduleTypeUnlinkFunc)(struct serverObject *key, void *value);
typedef void *(*moduleTypeCopyFunc)(struct serverObject *fromkey, struct serverObject *tokey, const void *value);
typedef int (*moduleTypeDefragFunc)(struct ValkeyModuleDefragCtx *ctx, struct serverObject *key, void **value);
typedef size_t (*moduleTypeMemUsageFunc2)(struct ValkeyModuleKeyOptCtx *ctx, const void *value, size_t sample_size);
typedef void (*moduleTypeFreeFunc2)(struct ValkeyModuleKeyOptCtx *ctx, void *value);
typedef size_t (*moduleTypeFreeEffortFunc2)(struct ValkeyModuleKeyOptCtx *ctx, const void *value);
typedef void (*moduleTypeUnlinkFunc2)(struct ValkeyModuleKeyOptCtx *ctx, void *value);
typedef void *(*moduleTypeCopyFunc2)(struct ValkeyModuleKeyOptCtx *ctx, const void *value);
typedef int (*moduleTypeAuthCallback)(struct ValkeyModuleCtx *ctx, void *username, void *password, const char **err);
/* The module type, which is referenced in each value of a given type, defines
* the methods and links to the module exporting the type. */
typedef struct ValkeyModuleType {
uint64_t id; /* Higher 54 bits of type ID + 10 lower bits of encoding ver. */
struct ValkeyModule *module;
moduleTypeLoadFunc rdb_load;
moduleTypeSaveFunc rdb_save;
moduleTypeRewriteFunc aof_rewrite;
moduleTypeMemUsageFunc mem_usage;
moduleTypeDigestFunc digest;
moduleTypeFreeFunc free;
moduleTypeFreeEffortFunc free_effort;
moduleTypeUnlinkFunc unlink;
moduleTypeCopyFunc copy;
moduleTypeDefragFunc defrag;
moduleTypeAuxLoadFunc aux_load;
moduleTypeAuxSaveFunc aux_save;
moduleTypeMemUsageFunc2 mem_usage2;
moduleTypeFreeEffortFunc2 free_effort2;
moduleTypeUnlinkFunc2 unlink2;
moduleTypeCopyFunc2 copy2;
moduleTypeAuxSaveFunc aux_save2;
int aux_save_triggers;
char name[10]; /* 9 bytes name + null term. Charset: A-Z a-z 0-9 _- */
} moduleType;
/* In Object 'robj' structures of type OBJ_MODULE, the value pointer
* is set to the following structure, referencing the moduleType structure
* in order to work with the value, and at the same time providing a raw
* pointer to the value, as created by the module commands operating with
* the module type.
*
* So for example in order to free such a value, it is possible to use
* the following code:
*
* if (robj->type == OBJ_MODULE) {
* moduleValue *mt = robj->ptr;
* mt->type->free(mt->value);
* zfree(mt); // We need to release this in-the-middle struct as well.
* }
*/
typedef struct moduleValue {
moduleType *type;
void *value;
} moduleValue;
/* This structure represents a module inside the system. */
typedef struct ValkeyModule {
void *handle; /* Module dlopen() handle. */
char *name; /* Module name. */
int ver; /* Module version. We use just progressive integers. */
int apiver; /* Module API version as requested during initialization.*/
list *types; /* Module data types. */
list *usedby; /* List of modules using APIs from this one. */
list *using; /* List of modules we use some APIs of. */
list *filters; /* List of filters the module has registered. */
list *module_configs; /* List of configurations the module has registered */
int configs_initialized; /* Have the module configurations been initialized? */
int in_call; /* RM_Call() nesting level */
int in_hook; /* Hooks callback nesting level for this module (0 or 1). */
int options; /* Module options and capabilities. */
int blocked_clients; /* Count of ValkeyModuleBlockedClient in this module. */
ValkeyModuleInfoFunc info_cb; /* Callback for module to add INFO fields. */
ValkeyModuleDefragFunc defrag_cb; /* Callback for global data defrag. */
struct moduleLoadQueueEntry *loadmod; /* Module load arguments for config rewrite. */
int num_commands_with_acl_categories; /* Number of commands in this module included in acl categories */
int onload; /* Flag to identify if the call is being made from Onload (0 or 1) */
size_t num_acl_categories_added; /* Number of acl categories added by this module. */
} ValkeyModule;
/* This is a wrapper for the 'rio' streams used inside rdb.c in the server, so that
* the user does not have to take the total count of the written bytes nor
* to care about error conditions. */
typedef struct ValkeyModuleIO {
size_t bytes; /* Bytes read / written so far. */
rio *rio; /* Rio stream. */
moduleType *type; /* Module type doing the operation. */
int error; /* True if error condition happened. */
ValkeyModuleCtx *ctx; /* Optional context, see RM_GetContextFromIO()*/
robj *key; /* Optional name of key processed */
int dbid; /* The dbid of the key being processed, -1 when unknown. */
sds pre_flush_buffer; /* A buffer that should be flushed before next write operation
* See rdbSaveSingleModuleAux for more details */
} ValkeyModuleIO;
/* Macro to initialize an IO context. Note that the 'ver' field is populated
* inside rdb.c according to the version of the value to load. */
static inline void moduleInitIOContext(ValkeyModuleIO *iovar,
moduleType *mtype,
rio *rioptr,
robj *keyptr,
int db) {
iovar->rio = rioptr;
iovar->type = mtype;
iovar->bytes = 0;
iovar->error = 0;
iovar->key = keyptr;
iovar->dbid = db;
iovar->ctx = NULL;
iovar->pre_flush_buffer = NULL;
}
/* This is a structure used to export DEBUG DIGEST capabilities to
* modules. We want to capture both the ordered and unordered elements of
* a data structure, so that a digest can be created in a way that correctly
* reflects the values. See the DEBUG DIGEST command implementation for more
* background. */
typedef struct ValkeyModuleDigest {
unsigned char o[20]; /* Ordered elements. */
unsigned char x[20]; /* Xored elements. */
robj *key; /* Optional name of key processed */
int dbid; /* The dbid of the key being processed */
} ValkeyModuleDigest;
/* Just start with a digest composed of all zero bytes. */
static inline void moduleInitDigestContext(ValkeyModuleDigest *mdvar) {
memset(mdvar->o, 0, sizeof(mdvar->o));
memset(mdvar->x, 0, sizeof(mdvar->x));
}
void moduleEnqueueLoadModule(sds path, sds *argv, int argc);
sds moduleLoadQueueEntryToLoadmoduleOptionStr(ValkeyModule *module,
const char *config_option_str);
ValkeyModuleCtx *moduleAllocateContext(void);
void moduleScriptingEngineInitContext(ValkeyModuleCtx *out_ctx,
ValkeyModule *module,
client *client);
void moduleFreeContext(ValkeyModuleCtx *ctx);
void moduleInitModulesSystem(void);
void moduleInitModulesSystemLast(void);
void modulesCron(void);
int moduleLoad(const char *path, void **argv, int argc, int is_loadex);
int moduleUnload(sds name, const char **errmsg);
void moduleLoadFromQueue(void);
int moduleGetCommandKeysViaAPI(struct serverCommand *cmd, robj **argv, int argc, getKeysResult *result);
int moduleGetCommandChannelsViaAPI(struct serverCommand *cmd, robj **argv, int argc, getKeysResult *result);
moduleType *moduleTypeLookupModuleByID(uint64_t id);
moduleType *moduleTypeLookupModuleByName(const char *name);
moduleType *moduleTypeLookupModuleByNameIgnoreCase(const char *name);
void moduleTypeNameByID(char *name, uint64_t moduleid);
const char *moduleTypeModuleName(moduleType *mt);
const char *moduleNameFromCommand(struct serverCommand *cmd);
void moduleFreeContext(ValkeyModuleCtx *ctx);
void moduleCallCommandUnblockedHandler(client *c);
int isModuleClientUnblocked(client *c);
void unblockClientFromModule(client *c);
void moduleHandleBlockedClients(void);
void moduleBlockedClientTimedOut(client *c, int from_module);
void modulePipeReadable(aeEventLoop *el, int fd, void *privdata, int mask);
size_t moduleCount(void);
void moduleAcquireGIL(void);
int moduleTryAcquireGIL(void);
void moduleReleaseGIL(void);
void moduleNotifyKeyspaceEvent(int type, const char *event, robj *key, int dbid);
void firePostExecutionUnitJobs(void);
void moduleCallCommandFilters(client *c);
void modulePostExecutionUnitOperations(void);
void ModuleForkDoneHandler(int exitcode, int bysignal);
int TerminateModuleForkChild(int child_pid, int wait);
ssize_t rdbSaveModulesAux(rio *rdb, int when);
int moduleAllDatatypesHandleErrors(void);
int moduleAllModulesHandleReplAsyncLoad(void);
sds modulesCollectInfo(sds info, dict *sections_dict, int for_crash_report, int sections);
void moduleFireServerEvent(uint64_t eid, int subid, void *data);
void processModuleLoadingProgressEvent(int is_aof);
int moduleTryServeClientBlockedOnKey(client *c, robj *key);
void moduleUnblockClient(client *c);
int moduleBlockedClientMayTimeout(client *c);
int moduleClientIsBlockedOnKeys(client *c);
void moduleNotifyUserChanged(client *c);
void moduleNotifyKeyUnlink(robj *key, robj *val, int dbid, int flags);
size_t moduleGetFreeEffort(robj *key, robj *val, int dbid);
size_t moduleGetMemUsage(robj *key, robj *val, size_t sample_size, int dbid);
robj *moduleTypeDupOrReply(client *c, robj *fromkey, robj *tokey, int todb, robj *value);
int moduleDefragValue(robj *key, robj *obj, int dbid);
int moduleLateDefrag(robj *key, robj *value, unsigned long *cursor, monotime endtime, int dbid);
void moduleDefragGlobals(void);
void *moduleGetHandleByName(char *modulename);
int moduleIsModuleCommand(void *module_handle, struct serverCommand *cmd);
#endif /* _MODULE_H_ */

View File

@ -35,6 +35,7 @@
#include "fpconv_dtoa.h"
#include "fmtargs.h"
#include "io_threads.h"
#include "module.h"
#include <strings.h>
#include <sys/socket.h>
#include <sys/uio.h>

View File

@ -28,6 +28,7 @@
*/
#include "server.h"
#include "module.h"
/* This file implements keyspace events notification via Pub/Sub and
* described at https://valkey.io/topics/notifications */

View File

@ -34,6 +34,7 @@
#include "intset.h" /* Compact integer set structure */
#include "zmalloc.h"
#include "sds.h"
#include "module.h"
#include <math.h>
#include <ctype.h>

View File

@ -37,6 +37,7 @@
#include "intset.h" /* Compact integer set structure */
#include "bio.h"
#include "zmalloc.h"
#include "module.h"
#include <math.h>
#include <fcntl.h>
@ -1098,7 +1099,7 @@ ssize_t rdbSaveObject(rio *rdb, robj *o, robj *key, int dbid) {
* to call the right module during loading. */
int retval = rdbSaveLen(rdb, mt->id);
if (retval == -1) return -1;
moduleInitIOContext(io, mt, rdb, key, dbid);
moduleInitIOContext(&io, mt, rdb, key, dbid);
io.bytes += retval;
/* Then write the module-specific representation + EOF marker. */
@ -1242,7 +1243,7 @@ ssize_t rdbSaveSingleModuleAux(rio *rdb, int when, moduleType *mt) {
/* Save a module-specific aux value. */
ValkeyModuleIO io;
int retval = 0;
moduleInitIOContext(io, mt, rdb, NULL, -1);
moduleInitIOContext(&io, mt, rdb, NULL, -1);
/* We save the AUX field header in a temporary buffer so we can support aux_save2 API.
* If aux_save2 is used the buffer will be flushed at the first time the module will perform
@ -2795,7 +2796,7 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, int dbid, int *error) {
ValkeyModuleIO io;
robj keyobj;
initStaticStringObject(keyobj, key);
moduleInitIOContext(io, mt, rdb, &keyobj, dbid);
moduleInitIOContext(&io, mt, rdb, &keyobj, dbid);
/* Call the rdb_load method of the module providing the 10 bit
* encoding version in the lower 10 bits of the module ID. */
void *ptr = mt->rdb_load(&io, moduleid & 1023);
@ -3221,7 +3222,7 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin
}
ValkeyModuleIO io;
moduleInitIOContext(io, mt, rdb, NULL, -1);
moduleInitIOContext(&io, mt, rdb, NULL, -1);
/* Call the rdb_load method of the module providing the 10 bit
* encoding version in the lower 10 bits of the module ID. */
int rc = mt->aux_load(&io, moduleid & 1023, when);

View File

@ -35,6 +35,7 @@
#include "bio.h"
#include "functions.h"
#include "connection.h"
#include "module.h"
#include <memory.h>
#include <sys/time.h>

View File

@ -31,6 +31,7 @@
#include "script.h"
#include "cluster.h"
#include "cluster_slot_stats.h"
#include "module.h"
scriptFlag scripts_flags_def[] = {
{.flag = SCRIPT_FLAG_NO_WRITES, .str = "no-writes"},

View File

@ -42,6 +42,7 @@
#include "fmtargs.h"
#include "io_threads.h"
#include "sds.h"
#include "module.h"
#include <time.h>
#include <signal.h>

View File

@ -701,168 +701,7 @@ typedef enum {
#define OBJ_STREAM 6 /* Stream object. */
#define OBJ_TYPE_MAX 7 /* Maximum number of object types */
/* Extract encver / signature from a module type ID. */
#define VALKEYMODULE_TYPE_ENCVER_BITS 10
#define VALKEYMODULE_TYPE_ENCVER_MASK ((1 << VALKEYMODULE_TYPE_ENCVER_BITS) - 1)
#define VALKEYMODULE_TYPE_ENCVER(id) ((id) & VALKEYMODULE_TYPE_ENCVER_MASK)
#define VALKEYMODULE_TYPE_SIGN(id) \
(((id) & ~((uint64_t)VALKEYMODULE_TYPE_ENCVER_MASK)) >> VALKEYMODULE_TYPE_ENCVER_BITS)
/* Bit flags for moduleTypeAuxSaveFunc */
#define VALKEYMODULE_AUX_BEFORE_RDB (1 << 0)
#define VALKEYMODULE_AUX_AFTER_RDB (1 << 1)
struct ValkeyModule;
struct ValkeyModuleIO;
struct ValkeyModuleDigest;
struct ValkeyModuleCtx;
struct moduleLoadQueueEntry;
struct ValkeyModuleKeyOptCtx;
struct ValkeyModuleCommand;
struct clusterState;
/* Each module type implementation should export a set of methods in order
* to serialize and deserialize the value in the RDB file, rewrite the AOF
* log, create the digest for "DEBUG DIGEST", and free the value when a key
* is deleted. */
typedef void *(*moduleTypeLoadFunc)(struct ValkeyModuleIO *io, int encver);
typedef void (*moduleTypeSaveFunc)(struct ValkeyModuleIO *io, void *value);
typedef int (*moduleTypeAuxLoadFunc)(struct ValkeyModuleIO *rdb, int encver, int when);
typedef void (*moduleTypeAuxSaveFunc)(struct ValkeyModuleIO *rdb, int when);
typedef void (*moduleTypeRewriteFunc)(struct ValkeyModuleIO *io, struct serverObject *key, void *value);
typedef void (*moduleTypeDigestFunc)(struct ValkeyModuleDigest *digest, void *value);
typedef size_t (*moduleTypeMemUsageFunc)(const void *value);
typedef void (*moduleTypeFreeFunc)(void *value);
typedef size_t (*moduleTypeFreeEffortFunc)(struct serverObject *key, const void *value);
typedef void (*moduleTypeUnlinkFunc)(struct serverObject *key, void *value);
typedef void *(*moduleTypeCopyFunc)(struct serverObject *fromkey, struct serverObject *tokey, const void *value);
typedef int (*moduleTypeDefragFunc)(struct ValkeyModuleDefragCtx *ctx, struct serverObject *key, void **value);
typedef size_t (*moduleTypeMemUsageFunc2)(struct ValkeyModuleKeyOptCtx *ctx, const void *value, size_t sample_size);
typedef void (*moduleTypeFreeFunc2)(struct ValkeyModuleKeyOptCtx *ctx, void *value);
typedef size_t (*moduleTypeFreeEffortFunc2)(struct ValkeyModuleKeyOptCtx *ctx, const void *value);
typedef void (*moduleTypeUnlinkFunc2)(struct ValkeyModuleKeyOptCtx *ctx, void *value);
typedef void *(*moduleTypeCopyFunc2)(struct ValkeyModuleKeyOptCtx *ctx, const void *value);
typedef int (*moduleTypeAuthCallback)(struct ValkeyModuleCtx *ctx, void *username, void *password, const char **err);
/* The module type, which is referenced in each value of a given type, defines
* the methods and links to the module exporting the type. */
typedef struct ValkeyModuleType {
uint64_t id; /* Higher 54 bits of type ID + 10 lower bits of encoding ver. */
struct ValkeyModule *module;
moduleTypeLoadFunc rdb_load;
moduleTypeSaveFunc rdb_save;
moduleTypeRewriteFunc aof_rewrite;
moduleTypeMemUsageFunc mem_usage;
moduleTypeDigestFunc digest;
moduleTypeFreeFunc free;
moduleTypeFreeEffortFunc free_effort;
moduleTypeUnlinkFunc unlink;
moduleTypeCopyFunc copy;
moduleTypeDefragFunc defrag;
moduleTypeAuxLoadFunc aux_load;
moduleTypeAuxSaveFunc aux_save;
moduleTypeMemUsageFunc2 mem_usage2;
moduleTypeFreeEffortFunc2 free_effort2;
moduleTypeUnlinkFunc2 unlink2;
moduleTypeCopyFunc2 copy2;
moduleTypeAuxSaveFunc aux_save2;
int aux_save_triggers;
char name[10]; /* 9 bytes name + null term. Charset: A-Z a-z 0-9 _- */
} moduleType;
/* In Object 'robj' structures of type OBJ_MODULE, the value pointer
* is set to the following structure, referencing the moduleType structure
* in order to work with the value, and at the same time providing a raw
* pointer to the value, as created by the module commands operating with
* the module type.
*
* So for example in order to free such a value, it is possible to use
* the following code:
*
* if (robj->type == OBJ_MODULE) {
* moduleValue *mt = robj->ptr;
* mt->type->free(mt->value);
* zfree(mt); // We need to release this in-the-middle struct as well.
* }
*/
typedef struct moduleValue {
moduleType *type;
void *value;
} moduleValue;
/* This structure represents a module inside the system. */
struct ValkeyModule {
void *handle; /* Module dlopen() handle. */
char *name; /* Module name. */
int ver; /* Module version. We use just progressive integers. */
int apiver; /* Module API version as requested during initialization.*/
list *types; /* Module data types. */
list *usedby; /* List of modules using APIs from this one. */
list *using; /* List of modules we use some APIs of. */
list *filters; /* List of filters the module has registered. */
list *module_configs; /* List of configurations the module has registered */
int configs_initialized; /* Have the module configurations been initialized? */
int in_call; /* RM_Call() nesting level */
int in_hook; /* Hooks callback nesting level for this module (0 or 1). */
int options; /* Module options and capabilities. */
int blocked_clients; /* Count of ValkeyModuleBlockedClient in this module. */
ValkeyModuleInfoFunc info_cb; /* Callback for module to add INFO fields. */
ValkeyModuleDefragFunc defrag_cb; /* Callback for global data defrag. */
struct moduleLoadQueueEntry *loadmod; /* Module load arguments for config rewrite. */
int num_commands_with_acl_categories; /* Number of commands in this module included in acl categories */
int onload; /* Flag to identify if the call is being made from Onload (0 or 1) */
size_t num_acl_categories_added; /* Number of acl categories added by this module. */
};
typedef struct ValkeyModule ValkeyModule;
/* This is a wrapper for the 'rio' streams used inside rdb.c in the server, so that
* the user does not have to take the total count of the written bytes nor
* to care about error conditions. */
struct ValkeyModuleIO {
size_t bytes; /* Bytes read / written so far. */
rio *rio; /* Rio stream. */
moduleType *type; /* Module type doing the operation. */
int error; /* True if error condition happened. */
struct ValkeyModuleCtx *ctx; /* Optional context, see RM_GetContextFromIO()*/
struct serverObject *key; /* Optional name of key processed */
int dbid; /* The dbid of the key being processed, -1 when unknown. */
sds pre_flush_buffer; /* A buffer that should be flushed before next write operation
* See rdbSaveSingleModuleAux for more details */
};
/* Macro to initialize an IO context. Note that the 'ver' field is populated
* inside rdb.c according to the version of the value to load. */
#define moduleInitIOContext(iovar, mtype, rioptr, keyptr, db) \
do { \
iovar.rio = rioptr; \
iovar.type = mtype; \
iovar.bytes = 0; \
iovar.error = 0; \
iovar.key = keyptr; \
iovar.dbid = db; \
iovar.ctx = NULL; \
iovar.pre_flush_buffer = NULL; \
} while (0)
/* This is a structure used to export DEBUG DIGEST capabilities to
* modules. We want to capture both the ordered and unordered elements of
* a data structure, so that a digest can be created in a way that correctly
* reflects the values. See the DEBUG DIGEST command implementation for more
* background. */
struct ValkeyModuleDigest {
unsigned char o[20]; /* Ordered elements. */
unsigned char x[20]; /* Xored elements. */
struct serverObject *key; /* Optional name of key processed */
int dbid; /* The dbid of the key being processed */
};
/* Just start with a digest composed of all zero bytes. */
#define moduleInitDigestContext(mdvar) \
do { \
memset(mdvar.o, 0, sizeof(mdvar.o)); \
memset(mdvar.x, 0, sizeof(mdvar.x)); \
} while (0)
typedef struct ValkeyModuleType moduleType;
/* Macro to check if the client is in the middle of module based authentication. */
#define clientHasModuleAuthInProgress(c) ((c)->module_auth_ctx != NULL)
@ -1418,12 +1257,6 @@ struct saveparam {
int changes;
};
struct moduleLoadQueueEntry {
sds path;
int argc;
robj **argv;
};
struct sentinelLoadQueueEntry {
int argc;
sds *argv;
@ -2716,59 +2549,6 @@ extern dict *modules;
/* Command metadata */
void populateCommandLegacyRangeSpec(struct serverCommand *c);
/* Modules */
void moduleInitModulesSystem(void);
void moduleInitModulesSystemLast(void);
void modulesCron(void);
int moduleLoad(const char *path, void **argv, int argc, int is_loadex);
int moduleUnload(sds name, const char **errmsg);
void moduleLoadFromQueue(void);
int moduleGetCommandKeysViaAPI(struct serverCommand *cmd, robj **argv, int argc, getKeysResult *result);
int moduleGetCommandChannelsViaAPI(struct serverCommand *cmd, robj **argv, int argc, getKeysResult *result);
moduleType *moduleTypeLookupModuleByID(uint64_t id);
moduleType *moduleTypeLookupModuleByName(const char *name);
moduleType *moduleTypeLookupModuleByNameIgnoreCase(const char *name);
void moduleTypeNameByID(char *name, uint64_t moduleid);
const char *moduleTypeModuleName(moduleType *mt);
const char *moduleNameFromCommand(struct serverCommand *cmd);
void moduleFreeContext(struct ValkeyModuleCtx *ctx);
void moduleCallCommandUnblockedHandler(client *c);
int isModuleClientUnblocked(client *c);
void unblockClientFromModule(client *c);
void moduleHandleBlockedClients(void);
void moduleBlockedClientTimedOut(client *c, int from_module);
void modulePipeReadable(aeEventLoop *el, int fd, void *privdata, int mask);
size_t moduleCount(void);
void moduleAcquireGIL(void);
int moduleTryAcquireGIL(void);
void moduleReleaseGIL(void);
void moduleNotifyKeyspaceEvent(int type, const char *event, robj *key, int dbid);
void firePostExecutionUnitJobs(void);
void moduleCallCommandFilters(client *c);
void modulePostExecutionUnitOperations(void);
void ModuleForkDoneHandler(int exitcode, int bysignal);
int TerminateModuleForkChild(int child_pid, int wait);
ssize_t rdbSaveModulesAux(rio *rdb, int when);
int moduleAllDatatypesHandleErrors(void);
int moduleAllModulesHandleReplAsyncLoad(void);
sds modulesCollectInfo(sds info, dict *sections_dict, int for_crash_report, int sections);
void moduleFireServerEvent(uint64_t eid, int subid, void *data);
void processModuleLoadingProgressEvent(int is_aof);
int moduleTryServeClientBlockedOnKey(client *c, robj *key);
void moduleUnblockClient(client *c);
int moduleBlockedClientMayTimeout(client *c);
int moduleClientIsBlockedOnKeys(client *c);
void moduleNotifyUserChanged(client *c);
void moduleNotifyKeyUnlink(robj *key, robj *val, int dbid, int flags);
size_t moduleGetFreeEffort(robj *key, robj *val, int dbid);
size_t moduleGetMemUsage(robj *key, robj *val, size_t sample_size, int dbid);
robj *moduleTypeDupOrReply(client *c, robj *fromkey, robj *tokey, int todb, robj *value);
int moduleDefragValue(robj *key, robj *obj, int dbid);
int moduleLateDefrag(robj *key, robj *value, unsigned long *cursor, monotime endtime, int dbid);
void moduleDefragGlobals(void);
void *moduleGetHandleByName(char *modulename);
int moduleIsModuleCommand(void *module_handle, struct serverCommand *cmd);
/* Utils */
long long ustime(void);
mstime_t mstime(void);

View File

@ -30,6 +30,7 @@
#include "mt19937-64.h"
#include "server.h"
#include "rdb.h"
#include "module.h"
#include <stdarg.h>
#include <sys/time.h>