diff --git a/runtest-moduleapi b/runtest-moduleapi index fdf60886f..da4c815da 100755 --- a/runtest-moduleapi +++ b/runtest-moduleapi @@ -29,4 +29,5 @@ $TCLSH tests/test_helper.tcl \ --single unit/moduleapi/blockedclient \ --single unit/moduleapi/getkeys \ --single unit/moduleapi/test_lazyfree \ +--single unit/moduleapi/defrag \ "${@}" diff --git a/src/defrag.c b/src/defrag.c index 165e55f65..fc327c506 100644 --- a/src/defrag.c +++ b/src/defrag.c @@ -794,6 +794,20 @@ long defragStream(redisDb *db, dictEntry *kde) { return defragged; } +/* Defrag a module key. This is either done immediately or scheduled + * for later. Returns then number of pointers defragged. + */ +long defragModule(redisDb *db, dictEntry *kde) { + robj *obj = dictGetVal(kde); + serverAssert(obj->type == OBJ_MODULE); + long defragged = 0; + + if (!moduleDefragValue(dictGetKey(kde), obj, &defragged)) + defragLater(db, kde); + + return defragged; +} + /* for each key we scan in the main dict, this function will attempt to defrag * all the various pointers it has. Returns a stat of how many pointers were * moved. */ @@ -865,8 +879,7 @@ long defragKey(redisDb *db, dictEntry *de) { } else if (ob->type == OBJ_STREAM) { defragged += defragStream(db, de); } else if (ob->type == OBJ_MODULE) { - /* Currently defragmenting modules private data types - * is not supported. */ + defragged += defragModule(db, de); } else { serverPanic("Unknown object type"); } @@ -928,6 +941,7 @@ long defragOtherGlobals() { * that remain static for a long time */ defragged += activeDefragSdsDict(server.lua_scripts, DEFRAG_SDS_DICT_VAL_IS_STROB); defragged += activeDefragSdsListAndDict(server.repl_scriptcache_fifo, server.repl_scriptcache_dict, DEFRAG_SDS_DICT_NO_VAL); + defragged += moduleDefragGlobals(); return defragged; } @@ -946,6 +960,8 @@ int defragLaterItem(dictEntry *de, unsigned long *cursor, long long endtime) { server.stat_active_defrag_hits += scanLaterHash(ob, cursor); } else if (ob->type == OBJ_STREAM) { return scanLaterStraemListpacks(ob, cursor, endtime, &server.stat_active_defrag_hits); + } else if (ob->type == OBJ_MODULE) { + return moduleLateDefrag(dictGetKey(de), ob, cursor, endtime, &server.stat_active_defrag_hits); } else { *cursor = 0; /* object type may have changed since we schedule it for later */ } @@ -1184,4 +1200,15 @@ void activeDefragCycle(void) { /* Not implemented yet. */ } +void *activeDefragAlloc(void *ptr) { + UNUSED(ptr); + return NULL; +} + +robj *activeDefragStringOb(robj *ob, long *defragged) { + UNUSED(ob); + UNUSED(defragged); + return NULL; +} + #endif diff --git a/src/module.c b/src/module.c index be0f939f2..94bdbf641 100644 --- a/src/module.c +++ b/src/module.c @@ -50,6 +50,7 @@ typedef struct RedisModuleInfoCtx { } RedisModuleInfoCtx; typedef void (*RedisModuleInfoFunc)(RedisModuleInfoCtx *ctx, int for_crash_report); +typedef void (*RedisModuleDefragFunc)(struct RedisModuleDefragCtx *ctx); /* This structure represents a module inside the system. */ struct RedisModule { @@ -66,6 +67,7 @@ struct RedisModule { int options; /* Module options and capabilities. */ int blocked_clients; /* Count of RedisModuleBlockedClient in this module. */ RedisModuleInfoFunc info_cb; /* Callback for module to add INFO fields. */ + RedisModuleDefragFunc defrag_cb; /* Callback for global data defrag. */ }; typedef struct RedisModule RedisModule; @@ -875,6 +877,7 @@ void RM_SetModuleAttribs(RedisModuleCtx *ctx, const char *name, int ver, int api module->in_hook = 0; module->options = 0; module->info_cb = 0; + module->defrag_cb = 0; ctx->module = module; } @@ -3694,9 +3697,10 @@ robj *moduleTypeDupOrReply(client *c, robj *fromkey, robj *tokey, robj *value) { * .mem_usage = myType_MemUsageCallBack, * .aux_load = myType_AuxRDBLoadCallBack, * .aux_save = myType_AuxRDBSaveCallBack, - * .free_effort = myType_FreeEffortCallBack - * .unlink = myType_UnlinkCallBack - * .copy = myType_CopyCallback + * .free_effort = myType_FreeEffortCallBack, + * .unlink = myType_UnlinkCallBack, + * .copy = myType_CopyCallback, + * .defrag = myType_DefragCallback * } * * * **rdb_load**: A callback function pointer that loads data from RDB files. @@ -3723,6 +3727,21 @@ robj *moduleTypeDupOrReply(client *c, robj *fromkey, robj *tokey, robj *value) { * Note: if the target key exists and is being overwritten, the copy callback will be * called first, followed by a free callback to the value that is being replaced. * + * * **defrag**: A callback function pointer that is used to request the module to defrag + * a key. The module should then iterate pointers and call the relevant RM_Defrag*() + * functions to defragment pointers or complex types. The module should continue + * iterating as long as RM_DefragShouldStop() returns a zero value, and return a + * zero value if finished or non-zero value if more work is left to be done. If more work + * needs to be done, RM_DefragCursorSet() and RM_DefragCursorGet() can be used to track + * this work across different calls. + * Normally, the defrag mechanism invokes the callback without a time limit, so + * RM_DefragShouldStop() always returns zero. The "late defrag" mechanism which has + * a time limit and provides cursor support is used only for keys that are determined + * to have significant internal complexity. To determine this, the defrag mechanism + * uses the free_effort callback and the 'active-defrag-max-scan-fields' config directive. + * NOTE: The value is passed as a void** and the function is expected to update the + * pointer if the top-level value pointer is defragmented and consequentially changes. + * * Note: the module name "AAAAAAAAA" is reserved and produces an error, it * happens to be pretty lame as well. * @@ -3766,6 +3785,7 @@ moduleType *RM_CreateDataType(RedisModuleCtx *ctx, const char *name, int encver, moduleTypeFreeEffortFunc free_effort; moduleTypeUnlinkFunc unlink; moduleTypeCopyFunc copy; + moduleTypeDefragFunc defrag; } v3; } *tms = (struct typemethods*) typemethods_ptr; @@ -3787,6 +3807,7 @@ moduleType *RM_CreateDataType(RedisModuleCtx *ctx, const char *name, int encver, mt->free_effort = tms->v3.free_effort; mt->unlink = tms->v3.unlink; mt->copy = tms->v3.copy; + mt->defrag = tms->v3.defrag; } memcpy(mt->name,name,sizeof(mt->name)); listAddNodeTail(ctx->module->types,mt); @@ -8165,6 +8186,205 @@ int *RM_GetCommandKeys(RedisModuleCtx *ctx, RedisModuleString **argv, int argc, return res; } +/* The defrag context, used to manage state during calls to the data type + * defrag callback. + */ +typedef struct RedisModuleDefragCtx { + long defragged; + long long int endtime; + unsigned long *cursor; +} RedisModuleDefragCtx; + +/* Register a defrag callback for global data, i.e. anything that the module + * may allocate that is not tied to a specific data type. + */ +int RM_RegisterDefragFunc(RedisModuleCtx *ctx, RedisModuleDefragFunc cb) { + ctx->module->defrag_cb = cb; + return REDISMODULE_OK; +} + +/* When the data type defrag callback iterates complex structures, this + * function should be called periodically. A zero (false) return + * indicates the callback may continue its work. A non-zero value (true) + * indicates it should stop. + * + * When stopped, the callback may use RM_DefragCursorSet() to store its + * position so it can later use RM_DefragCursorGet() to resume defragging. + * + * When stopped and more work is left to be done, the callback should + * return 1. Otherwise, it should return 0. + * + * NOTE: Modules should consider the frequency in which this function is called, + * so it generally makes sense to do small batches of work in between calls. + */ +int RM_DefragShouldStop(RedisModuleDefragCtx *ctx) { + return (ctx->endtime != 0 && ctx->endtime < ustime()); +} + +/* Store an arbitrary cursor value for future re-use. + * + * This should only be called if RM_DefragShouldStop() has returned a non-zero + * value and the defrag callback is about to exit without fully iterating its + * data type. + * + * This behavior is reserved to cases where late defrag is performed. Late + * defrag is selected for keys that implement the free_effort callback and + * return a free_effort value that is larger than the defrag + * 'active-defrag-max-scan-fields' configuration directive. + * + * Smaller keys, keys that do not implement free_effort or the global + * defrag callback are not called in late-defrag mode. In those cases, a + * call to this function will return REDISMODULE_ERR. + * + * The cursor may be used by the module to represent some progress into the + * module's data type. Modules may also store additional cursor-related + * information locally and use the cursor as a flag that indicates when + * traversal of a new key begins. This is possible because the API makes + * a guarantee that concurrent defragmentation of multiple keys will + * not be performed. + */ +int RM_DefragCursorSet(RedisModuleDefragCtx *ctx, unsigned long cursor) { + if (!ctx->cursor) + return REDISMODULE_ERR; + + *ctx->cursor = cursor; + return REDISMODULE_OK; +} + +/* Fetch a cursor value that has been previously stored using RM_DefragCursorSet(). + * + * If not called for a late defrag operation, REDISMODULE_ERR will be returned and + * the cursor should be ignored. See DM_DefragCursorSet() for more details on + * defrag cursors. + */ +int RM_DefragCursorGet(RedisModuleDefragCtx *ctx, unsigned long *cursor) { + if (!ctx->cursor) + return REDISMODULE_ERR; + + *cursor = *ctx->cursor; + return REDISMODULE_OK; +} + +/* Defrag a memory allocation previously allocated by RM_Alloc, RM_Calloc, etc. + * The defragmentation process involves allocating a new memory block and copying + * the contents to it, like realloc(). + * + * If defragmentation was not necessary, NULL is returned and the operation has + * no other effect. + * + * If a non-NULL value is returned, the caller should use the new pointer instead + * of the old one and update any reference to the old pointer, which must not + * be used again. + */ +void *RM_DefragAlloc(RedisModuleDefragCtx *ctx, void *ptr) { + void *newptr = activeDefragAlloc(ptr); + if (newptr) + ctx->defragged++; + + return newptr; +} + +/* Defrag a RedisModuleString previously allocated by RM_Alloc, RM_Calloc, etc. + * See RM_DefragAlloc() for more information on how the defragmentation process + * works. + * + * NOTE: It is only possible to defrag strings that have a single reference. + * Typically this means strings retained with RM_RetainString or RM_HoldString + * may not be defragmentable. One exception is command argvs which, if retained + * by the module, will end up with a single reference (because the reference + * on the Redis side is dropped as soon as the command callback returns). + */ +RedisModuleString *RM_DefragRedisModuleString(RedisModuleDefragCtx *ctx, RedisModuleString *str) { + return activeDefragStringOb(str, &ctx->defragged); +} + + +/* Perform a late defrag of a module datatype key. + * + * Returns a zero value (and initializes the cursor) if no more needs to be done, + * or a non-zero value otherwise. + */ +int moduleLateDefrag(robj *key, robj *value, unsigned long *cursor, long long endtime, long long *defragged) { + moduleValue *mv = value->ptr; + moduleType *mt = mv->type; + + RedisModuleDefragCtx defrag_ctx = { 0, endtime, cursor }; + + /* Invoke callback. Note that the callback may be missing if the key has been + * replaced with a different type since our last visit. + */ + int ret = 0; + if (mt->defrag) + ret = mt->defrag(&defrag_ctx, key, &mv->value); + + *defragged += defrag_ctx.defragged; + if (!ret) { + *cursor = 0; /* No more work to do */ + return 0; + } + + return 1; +} + +/* Attempt to defrag a module data type value. Depending on complexity, + * the operation may happen immediately or be scheduled for later. + * + * Returns 1 if the operation has been completed or 0 if it needs to + * be scheduled for late defrag. + */ +int moduleDefragValue(robj *key, robj *value, long *defragged) { + moduleValue *mv = value->ptr; + moduleType *mt = mv->type; + + /* Try to defrag moduleValue itself regardless of whether or not + * defrag callbacks are provided. + */ + moduleValue *newmv = activeDefragAlloc(mv); + if (newmv) { + (*defragged)++; + value->ptr = mv = newmv; + } + + if (!mt->defrag) + return 1; + + /* Use free_effort to determine complexity of module value, and if + * necessary schedule it for defragLater instead of quick immediate + * defrag. + */ + if (mt->free_effort) { + size_t effort = mt->free_effort(key, mv->value); + if (!effort) + effort = SIZE_MAX; + if (effort > server.active_defrag_max_scan_fields) { + return 0; /* Defrag later */ + } + } + + RedisModuleDefragCtx defrag_ctx = { 0, 0, NULL }; + mt->defrag(&defrag_ctx, key, &mv->value); + (*defragged) += defrag_ctx.defragged; + return 1; +} + +/* Call registered module API defrag functions */ +long moduleDefragGlobals(void) { + dictIterator *di = dictGetIterator(modules); + dictEntry *de; + long defragged = 0; + + while ((de = dictNext(di)) != NULL) { + struct RedisModule *module = dictGetVal(de); + if (!module->defrag_cb) + continue; + RedisModuleDefragCtx defrag_ctx = { 0, 0, NULL }; + module->defrag_cb(&defrag_ctx); + defragged += defrag_ctx.defragged; + } + + return defragged; +} + /* Register all the APIs we export. Keep this function at the end of the * file so that's easy to seek it to add new entries. */ void moduleRegisterCoreAPI(void) { @@ -8408,4 +8628,10 @@ void moduleRegisterCoreAPI(void) { REGISTER_API(GetClientCertificate); REGISTER_API(GetCommandKeys); REGISTER_API(GetTypeMethodVersion); + REGISTER_API(RegisterDefragFunc); + REGISTER_API(DefragAlloc); + REGISTER_API(DefragRedisModuleString); + REGISTER_API(DefragShouldStop); + REGISTER_API(DefragCursorSet); + REGISTER_API(DefragCursorGet); } diff --git a/src/redismodule.h b/src/redismodule.h index e4e8b2b3a..d7d4b7689 100644 --- a/src/redismodule.h +++ b/src/redismodule.h @@ -225,6 +225,7 @@ typedef struct RedisModuleEvent { } RedisModuleEvent; struct RedisModuleCtx; +struct RedisModuleDefragCtx; typedef void (*RedisModuleEventCallback)(struct RedisModuleCtx *ctx, RedisModuleEvent eid, uint64_t subevent, void *data); static const RedisModuleEvent @@ -479,6 +480,7 @@ typedef struct RedisModuleCommandFilter RedisModuleCommandFilter; typedef struct RedisModuleInfoCtx RedisModuleInfoCtx; typedef struct RedisModuleServerInfoData RedisModuleServerInfoData; typedef struct RedisModuleScanCursor RedisModuleScanCursor; +typedef struct RedisModuleDefragCtx RedisModuleDefragCtx; typedef struct RedisModuleUser RedisModuleUser; typedef int (*RedisModuleCmdFunc)(RedisModuleCtx *ctx, RedisModuleString **argv, int argc); @@ -495,6 +497,7 @@ typedef void (*RedisModuleTypeFreeFunc)(void *value); typedef size_t (*RedisModuleTypeFreeEffortFunc)(RedisModuleString *key, const void *value); typedef void (*RedisModuleTypeUnlinkFunc)(RedisModuleString *key, const void *value); typedef void *(*RedisModuleTypeCopyFunc)(RedisModuleString *fromkey, RedisModuleString *tokey, const void *value); +typedef int (*RedisModuleTypeDefragFunc)(RedisModuleDefragCtx *ctx, RedisModuleString *key, void **value); typedef void (*RedisModuleClusterMessageReceiver)(RedisModuleCtx *ctx, const char *sender_id, uint8_t type, const unsigned char *payload, uint32_t len); typedef void (*RedisModuleTimerProc)(RedisModuleCtx *ctx, void *data); typedef void (*RedisModuleCommandFilterFunc) (RedisModuleCommandFilterCtx *filter); @@ -503,6 +506,7 @@ typedef void (*RedisModuleInfoFunc)(RedisModuleInfoCtx *ctx, int for_crash_repor typedef void (*RedisModuleScanCB)(RedisModuleCtx *ctx, RedisModuleString *keyname, RedisModuleKey *key, void *privdata); typedef void (*RedisModuleScanKeyCB)(RedisModuleKey *key, RedisModuleString *field, RedisModuleString *value, void *privdata); typedef void (*RedisModuleUserChangedFunc) (uint64_t client_id, void *privdata); +typedef int (*RedisModuleDefragFunc)(RedisModuleDefragCtx *ctx); typedef struct RedisModuleTypeMethods { uint64_t version; @@ -518,6 +522,7 @@ typedef struct RedisModuleTypeMethods { RedisModuleTypeFreeEffortFunc free_effort; RedisModuleTypeUnlinkFunc unlink; RedisModuleTypeCopyFunc copy; + RedisModuleTypeDefragFunc defrag; } RedisModuleTypeMethods; #define REDISMODULE_GET_API(name) \ @@ -776,6 +781,12 @@ REDISMODULE_API int (*RedisModule_AuthenticateClientWithUser)(RedisModuleCtx *ct REDISMODULE_API int (*RedisModule_DeauthenticateAndCloseClient)(RedisModuleCtx *ctx, uint64_t client_id) REDISMODULE_ATTR; REDISMODULE_API RedisModuleString * (*RedisModule_GetClientCertificate)(RedisModuleCtx *ctx, uint64_t id) REDISMODULE_ATTR; REDISMODULE_API int *(*RedisModule_GetCommandKeys)(RedisModuleCtx *ctx, RedisModuleString **argv, int argc, int *num_keys) REDISMODULE_ATTR; +REDISMODULE_API int (*RedisModule_RegisterDefragFunc)(RedisModuleCtx *ctx, RedisModuleDefragFunc func) REDISMODULE_ATTR; +REDISMODULE_API void *(*RedisModule_DefragAlloc)(RedisModuleDefragCtx *ctx, void *ptr) REDISMODULE_ATTR; +REDISMODULE_API RedisModuleString *(*RedisModule_DefragRedisModuleString)(RedisModuleDefragCtx *ctx, RedisModuleString *str) REDISMODULE_ATTR; +REDISMODULE_API int (*RedisModule_DefragShouldStop)(RedisModuleDefragCtx *ctx) REDISMODULE_ATTR; +REDISMODULE_API int (*RedisModule_DefragCursorSet)(RedisModuleDefragCtx *ctx, unsigned long cursor) REDISMODULE_ATTR; +REDISMODULE_API int (*RedisModule_DefragCursorGet)(RedisModuleDefragCtx *ctx, unsigned long *cursor) REDISMODULE_ATTR; #endif #define RedisModule_IsAOFClient(id) ((id) == CLIENT_ID_AOF) @@ -1025,6 +1036,12 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int REDISMODULE_GET_API(AuthenticateClientWithUser); REDISMODULE_GET_API(GetClientCertificate); REDISMODULE_GET_API(GetCommandKeys); + REDISMODULE_GET_API(RegisterDefragFunc); + REDISMODULE_GET_API(DefragAlloc); + REDISMODULE_GET_API(DefragRedisModuleString); + REDISMODULE_GET_API(DefragShouldStop); + REDISMODULE_GET_API(DefragCursorSet); + REDISMODULE_GET_API(DefragCursorGet); #endif if (RedisModule_IsModuleNameBusy && RedisModule_IsModuleNameBusy(name)) return REDISMODULE_ERR; diff --git a/src/server.h b/src/server.h index 4cb5ab855..89a25a4e0 100644 --- a/src/server.h +++ b/src/server.h @@ -514,6 +514,7 @@ struct RedisModuleIO; struct RedisModuleDigest; struct RedisModuleCtx; struct redisObject; +struct RedisModuleDefragCtx; /* Each module type implementation should export a set of methods in order * to serialize and deserialize the value in the RDB file, rewrite the AOF @@ -530,6 +531,7 @@ typedef void (*moduleTypeFreeFunc)(void *value); typedef size_t (*moduleTypeFreeEffortFunc)(struct redisObject *key, const void *value); typedef void (*moduleTypeUnlinkFunc)(struct redisObject *key, void *value); typedef void *(*moduleTypeCopyFunc)(struct redisObject *fromkey, struct redisObject *tokey, const void *value); +typedef int (*moduleTypeDefragFunc)(struct RedisModuleDefragCtx *ctx, struct redisObject *key, void **value); /* This callback type is called by moduleNotifyUserChanged() every time * a user authenticated via the module API is associated with a different @@ -552,6 +554,7 @@ typedef struct RedisModuleType { moduleTypeFreeEffortFunc free_effort; moduleTypeUnlinkFunc unlink; moduleTypeCopyFunc copy; + moduleTypeDefragFunc defrag; moduleTypeAuxLoadFunc aux_load; moduleTypeAuxSaveFunc aux_save; int aux_save_triggers; @@ -1699,6 +1702,9 @@ int moduleClientIsBlockedOnKeys(client *c); void moduleNotifyUserChanged(client *c); void moduleNotifyKeyUnlink(robj *key, robj *val); robj *moduleTypeDupOrReply(client *c, robj *fromkey, robj *tokey, robj *value); +int moduleDefragValue(robj *key, robj *obj, long *defragged); +int moduleLateDefrag(robj *key, robj *value, unsigned long *cursor, long long endtime, long long *defragged); +long moduleDefragGlobals(void); /* Utils */ long long ustime(void); @@ -2138,6 +2144,8 @@ void freeMemoryOverheadData(struct redisMemOverhead *mh); void checkChildrenDone(void); int setOOMScoreAdj(int process_class); void rejectCommandFormat(client *c, const char *fmt, ...); +void *activeDefragAlloc(void *ptr); +robj *activeDefragStringOb(robj* ob, long *defragged); #define RESTART_SERVER_NONE 0 #define RESTART_SERVER_GRACEFULLY (1<<0) /* Do proper shutdown. */ diff --git a/tests/modules/Makefile b/tests/modules/Makefile index 36222fc1f..5b2c30c17 100644 --- a/tests/modules/Makefile +++ b/tests/modules/Makefile @@ -28,6 +28,7 @@ TEST_MODULES = \ getkeys.so \ test_lazyfree.so \ timer.so \ + defragtest.so .PHONY: all diff --git a/tests/modules/defragtest.c b/tests/modules/defragtest.c new file mode 100644 index 000000000..b63680c63 --- /dev/null +++ b/tests/modules/defragtest.c @@ -0,0 +1,234 @@ +/* A module that implements defrag callback mechanisms. + */ + +#define REDISMODULE_EXPERIMENTAL_API +#include "redismodule.h" + +static RedisModuleType *FragType; + +struct FragObject { + unsigned long len; + void **values; + int maxstep; +}; + +/* Make sure we get the expected cursor */ +unsigned long int last_set_cursor = 0; + +unsigned long int datatype_attempts = 0; +unsigned long int datatype_defragged = 0; +unsigned long int datatype_resumes = 0; +unsigned long int datatype_wrong_cursor = 0; +unsigned long int global_attempts = 0; +unsigned long int global_defragged = 0; + +int global_strings_len = 0; +RedisModuleString **global_strings = NULL; + +static void createGlobalStrings(RedisModuleCtx *ctx, int count) +{ + global_strings_len = count; + global_strings = RedisModule_Alloc(sizeof(RedisModuleString *) * count); + + for (int i = 0; i < count; i++) { + global_strings[i] = RedisModule_CreateStringFromLongLong(ctx, i); + } +} + +static int defragGlobalStrings(RedisModuleDefragCtx *ctx) +{ + for (int i = 0; i < global_strings_len; i++) { + RedisModuleString *new = RedisModule_DefragRedisModuleString(ctx, global_strings[i]); + global_attempts++; + if (new != NULL) { + global_strings[i] = new; + global_defragged++; + } + } + + return 0; +} + +static void FragInfo(RedisModuleInfoCtx *ctx, int for_crash_report) { + REDISMODULE_NOT_USED(for_crash_report); + + RedisModule_InfoAddSection(ctx, "stats"); + RedisModule_InfoAddFieldLongLong(ctx, "datatype_attempts", datatype_attempts); + RedisModule_InfoAddFieldLongLong(ctx, "datatype_defragged", datatype_defragged); + RedisModule_InfoAddFieldLongLong(ctx, "datatype_resumes", datatype_resumes); + RedisModule_InfoAddFieldLongLong(ctx, "datatype_wrong_cursor", datatype_wrong_cursor); + RedisModule_InfoAddFieldLongLong(ctx, "global_attempts", global_attempts); + RedisModule_InfoAddFieldLongLong(ctx, "global_defragged", global_defragged); +} + +struct FragObject *createFragObject(unsigned long len, unsigned long size, int maxstep) { + struct FragObject *o = RedisModule_Alloc(sizeof(*o)); + o->len = len; + o->values = RedisModule_Alloc(sizeof(RedisModuleString*) * len); + o->maxstep = maxstep; + + for (unsigned long i = 0; i < len; i++) { + o->values[i] = RedisModule_Calloc(1, size); + } + + return o; +} + +/* FRAG.RESETSTATS */ +static int fragResetStatsCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + REDISMODULE_NOT_USED(argv); + REDISMODULE_NOT_USED(argc); + + datatype_attempts = 0; + datatype_defragged = 0; + datatype_resumes = 0; + datatype_wrong_cursor = 0; + global_attempts = 0; + global_defragged = 0; + + RedisModule_ReplyWithSimpleString(ctx, "OK"); + return REDISMODULE_OK; +} + +/* FRAG.CREATE key len size maxstep */ +static int fragCreateCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + if (argc != 5) + return RedisModule_WrongArity(ctx); + + RedisModuleKey *key = RedisModule_OpenKey(ctx,argv[1], + REDISMODULE_READ|REDISMODULE_WRITE); + int type = RedisModule_KeyType(key); + if (type != REDISMODULE_KEYTYPE_EMPTY) + { + return RedisModule_ReplyWithError(ctx, "ERR key exists"); + } + + long long len; + if ((RedisModule_StringToLongLong(argv[2], &len) != REDISMODULE_OK)) { + return RedisModule_ReplyWithError(ctx, "ERR invalid len"); + } + + long long size; + if ((RedisModule_StringToLongLong(argv[3], &size) != REDISMODULE_OK)) { + return RedisModule_ReplyWithError(ctx, "ERR invalid size"); + } + + long long maxstep; + if ((RedisModule_StringToLongLong(argv[4], &maxstep) != REDISMODULE_OK)) { + return RedisModule_ReplyWithError(ctx, "ERR invalid maxstep"); + } + + struct FragObject *o = createFragObject(len, size, maxstep); + RedisModule_ModuleTypeSetValue(key, FragType, o); + RedisModule_ReplyWithSimpleString(ctx, "OK"); + RedisModule_CloseKey(key); + + return REDISMODULE_OK; +} + +void FragFree(void *value) { + struct FragObject *o = value; + + for (unsigned long i = 0; i < o->len; i++) + RedisModule_Free(o->values[i]); + RedisModule_Free(o->values); + RedisModule_Free(o); +} + +size_t FragFreeEffort(RedisModuleString *key, const void *value) { + REDISMODULE_NOT_USED(key); + + const struct FragObject *o = value; + return o->len; +} + +int FragDefrag(RedisModuleDefragCtx *ctx, RedisModuleString *key, void **value) { + REDISMODULE_NOT_USED(key); + unsigned long i = 0; + int steps = 0; + + /* Attempt to get cursor, validate it's what we're exepcting */ + if (RedisModule_DefragCursorGet(ctx, &i) == REDISMODULE_OK) { + if (i > 0) datatype_resumes++; + + /* Validate we're expecting this cursor */ + if (i != last_set_cursor) datatype_wrong_cursor++; + } else { + if (last_set_cursor != 0) datatype_wrong_cursor++; + } + + /* Attempt to defrag the object itself */ + datatype_attempts++; + struct FragObject *o = RedisModule_DefragAlloc(ctx, *value); + if (o == NULL) { + /* Not defragged */ + o = *value; + } else { + /* Defragged */ + *value = o; + datatype_defragged++; + } + + /* Deep defrag now */ + for (; i < o->len; i++) { + datatype_attempts++; + void *new = RedisModule_DefragAlloc(ctx, o->values[i]); + if (new) { + o->values[i] = new; + datatype_defragged++; + } + + if ((o->maxstep && ++steps > o->maxstep) || + ((i % 64 == 0) && RedisModule_DefragShouldStop(ctx))) + { + RedisModule_DefragCursorSet(ctx, i); + last_set_cursor = i; + return 1; + } + } + + last_set_cursor = 0; + return 0; +} + +int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + REDISMODULE_NOT_USED(argv); + REDISMODULE_NOT_USED(argc); + + if (RedisModule_Init(ctx, "defragtest", 1, REDISMODULE_APIVER_1) + == REDISMODULE_ERR) return REDISMODULE_ERR; + + if (RedisModule_GetTypeMethodVersion() < REDISMODULE_TYPE_METHOD_VERSION) { + return REDISMODULE_ERR; + } + + long long glen; + if (argc != 1 || RedisModule_StringToLongLong(argv[0], &glen) == REDISMODULE_ERR) { + return REDISMODULE_ERR; + } + + createGlobalStrings(ctx, glen); + + RedisModuleTypeMethods tm = { + .version = REDISMODULE_TYPE_METHOD_VERSION, + .free = FragFree, + .free_effort = FragFreeEffort, + .defrag = FragDefrag + }; + + FragType = RedisModule_CreateDataType(ctx, "frag_type", 0, &tm); + if (FragType == NULL) return REDISMODULE_ERR; + + if (RedisModule_CreateCommand(ctx, "frag.create", + fragCreateCommand, "write deny-oom", 1, 1, 1) == REDISMODULE_ERR) + return REDISMODULE_ERR; + + if (RedisModule_CreateCommand(ctx, "frag.resetstats", + fragResetStatsCommand, "write deny-oom", 1, 1, 1) == REDISMODULE_ERR) + return REDISMODULE_ERR; + + RedisModule_RegisterInfoFunc(ctx, FragInfo); + RedisModule_RegisterDefragFunc(ctx, defragGlobalStrings); + + return REDISMODULE_OK; +} diff --git a/tests/support/util.tcl b/tests/support/util.tcl index 0807ebe45..e4b70ed20 100644 --- a/tests/support/util.tcl +++ b/tests/support/util.tcl @@ -46,13 +46,17 @@ proc warnings_from_file {filename} { join $result "\n" } -# Return value for INFO property -proc status {r property} { - if {[regexp "\r\n$property:(.*?)\r\n" [{*}$r info] _ value]} { +proc getInfoProperty {infostr property} { + if {[regexp "\r\n$property:(.*?)\r\n" $infostr _ value]} { set _ $value } } +# Return value for INFO property +proc status {r property} { + set _ [getInfoProperty [{*}$r info] $property] +} + proc waitForBgsave r { while 1 { if {[status r rdb_bgsave_in_progress] eq 1} { diff --git a/tests/unit/moduleapi/defrag.tcl b/tests/unit/moduleapi/defrag.tcl new file mode 100644 index 000000000..d4abbac1a --- /dev/null +++ b/tests/unit/moduleapi/defrag.tcl @@ -0,0 +1,43 @@ +set testmodule [file normalize tests/modules/defragtest.so] + +start_server {tags {"modules"} overrides {{save ""}}} { + r module load $testmodule 10000 + r config set hz 100 + + test {Module defrag: simple key defrag works} { + r frag.create key1 1 1000 0 + + r config set active-defrag-ignore-bytes 1 + r config set active-defrag-threshold-lower 0 + r config set active-defrag-cycle-min 99 + r config set activedefrag yes + + after 2000 + set info [r info defragtest_stats] + assert {[getInfoProperty $info defragtest_datatype_attempts] > 0} + assert_equal 0 [getInfoProperty $info defragtest_datatype_resumes] + } + + test {Module defrag: late defrag with cursor works} { + r flushdb + r frag.resetstats + + # key can only be defragged in no less than 10 iterations + # due to maxstep + r frag.create key2 10000 100 1000 + + after 2000 + set info [r info defragtest_stats] + assert {[getInfoProperty $info defragtest_datatype_resumes] > 10} + assert_equal 0 [getInfoProperty $info defragtest_datatype_wrong_cursor] + } + + test {Module defrag: global defrag works} { + r flushdb + r frag.resetstats + + after 2000 + set info [r info defragtest_stats] + assert {[getInfoProperty $info defragtest_global_attempts] > 0} + } +}