diff --git a/src/module.c b/src/module.c index 11b47d99f..33ac151a2 100644 --- a/src/module.c +++ b/src/module.c @@ -6381,6 +6381,9 @@ robj *moduleTypeDupOrReply(client *c, robj *fromkey, robj *tokey, int todb, robj * so that meta information such as key name and db id can be obtained. * * **copy2**: Similar to `copy`, but provides the `RedisModuleKeyOptCtx` parameter * so that meta information such as key names and db ids can be obtained. + * * **aux_save2**: Similar to `aux_save`, but with small semantic change, if the module + * saves nothing on this callback then no data about this aux field will be written to the + * RDB and it will be possible to load the RDB even if the module is not loaded. * * Note: the module name "AAAAAAAAA" is reserved and produces an error, it * happens to be pretty lame as well. @@ -6433,6 +6436,9 @@ moduleType *RM_CreateDataType(RedisModuleCtx *ctx, const char *name, int encver, moduleTypeUnlinkFunc2 unlink2; moduleTypeCopyFunc2 copy2; } v4; + struct { + moduleTypeAuxSaveFunc aux_save2; + } v5; } *tms = (struct typemethods*) typemethods_ptr; moduleType *mt = zcalloc(sizeof(*mt)); @@ -6461,6 +6467,9 @@ moduleType *RM_CreateDataType(RedisModuleCtx *ctx, const char *name, int encver, mt->free_effort2 = tms->v4.free_effort2; mt->copy2 = tms->v4.copy2; } + if (tms->version >= 5) { + mt->aux_save2 = tms->v5.aux_save2; + } memcpy(mt->name,name,sizeof(mt->name)); listAddNodeTail(ctx->module->types,mt); return mt; @@ -6575,11 +6584,25 @@ int RM_IsIOError(RedisModuleIO *io) { return io->error; } +static int flushRedisModuleIOBuffer(RedisModuleIO *io) { + if (!io->pre_flush_buffer) return 0; + + /* We have data that must be flushed before saving the current data. + * Lets flush it. */ + sds pre_flush_buffer = io->pre_flush_buffer; + io->pre_flush_buffer = NULL; + ssize_t retval = rdbWriteRaw(io->rio, pre_flush_buffer, sdslen(pre_flush_buffer)); + sdsfree(pre_flush_buffer); + if (retval >= 0) io->bytes += retval; + return retval; +} + /* Save an unsigned 64 bit value into the RDB file. This function should only * be called in the context of the rdb_save method of modules implementing new * data types. */ void RM_SaveUnsigned(RedisModuleIO *io, uint64_t value) { if (io->error) return; + if (flushRedisModuleIOBuffer(io) == -1) goto saveerr; /* Save opcode. */ int retval = rdbSaveLen(io->rio, RDB_MODULE_OPCODE_UINT); if (retval == -1) goto saveerr; @@ -6633,6 +6656,7 @@ int64_t RM_LoadSigned(RedisModuleIO *io) { * the RDB file. */ void RM_SaveString(RedisModuleIO *io, RedisModuleString *s) { if (io->error) return; + if (flushRedisModuleIOBuffer(io) == -1) goto saveerr; /* Save opcode. */ ssize_t retval = rdbSaveLen(io->rio, RDB_MODULE_OPCODE_STRING); if (retval == -1) goto saveerr; @@ -6651,6 +6675,7 @@ saveerr: * as input. */ void RM_SaveStringBuffer(RedisModuleIO *io, const char *str, size_t len) { if (io->error) return; + if (flushRedisModuleIOBuffer(io) == -1) goto saveerr; /* Save opcode. */ ssize_t retval = rdbSaveLen(io->rio, RDB_MODULE_OPCODE_STRING); if (retval == -1) goto saveerr; @@ -6709,6 +6734,7 @@ char *RM_LoadStringBuffer(RedisModuleIO *io, size_t *lenptr) { * It is possible to load back the value with RedisModule_LoadDouble(). */ void RM_SaveDouble(RedisModuleIO *io, double value) { if (io->error) return; + if (flushRedisModuleIOBuffer(io) == -1) goto saveerr; /* Save opcode. */ int retval = rdbSaveLen(io->rio, RDB_MODULE_OPCODE_DOUBLE); if (retval == -1) goto saveerr; @@ -6744,6 +6770,7 @@ loaderr: * It is possible to load back the value with RedisModule_LoadFloat(). */ void RM_SaveFloat(RedisModuleIO *io, float value) { if (io->error) return; + if (flushRedisModuleIOBuffer(io) == -1) goto saveerr; /* Save opcode. */ int retval = rdbSaveLen(io->rio, RDB_MODULE_OPCODE_FLOAT); if (retval == -1) goto saveerr; @@ -6814,7 +6841,7 @@ ssize_t rdbSaveModulesAux(rio *rdb, int when) { listRewind(module->types,&li); while((ln = listNext(&li))) { moduleType *mt = ln->value; - if (!mt->aux_save || !(mt->aux_save_triggers & when)) + if ((!mt->aux_save && !mt->aux_save2) || !(mt->aux_save_triggers & when)) continue; ssize_t ret = rdbSaveSingleModuleAux(rdb, when, mt); if (ret==-1) { diff --git a/src/rdb.c b/src/rdb.c index 24628a89b..ff8a1f53d 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -100,7 +100,7 @@ void rdbReportError(int corruption_error, int linenum, char *reason, ...) { exit(1); } -static ssize_t rdbWriteRaw(rio *rdb, void *p, size_t len) { +ssize_t rdbWriteRaw(rio *rdb, void *p, size_t len) { if (rdb && rioWrite(rdb,p,len) == 0) return -1; return len; @@ -1186,30 +1186,49 @@ int rdbSaveInfoAuxFields(rio *rdb, int rdbflags, rdbSaveInfo *rsi) { ssize_t rdbSaveSingleModuleAux(rio *rdb, int when, moduleType *mt) { /* Save a module-specific aux value. */ RedisModuleIO io; - int retval = rdbSaveType(rdb, RDB_OPCODE_MODULE_AUX); - if (retval == -1) return -1; + int retval = 0; moduleInitIOContext(io,mt,rdb,NULL,-1); - io.bytes += retval; + + /* We save the AUX field header in a temporary buffer so we can support aux_save2 API. + * If aux_save2 is used the buffer will be flushed at the first time the module will perform + * a write operation to the RDB and will be ignored is case there was no writes. */ + rio aux_save_headers_rio; + rioInitWithBuffer(&aux_save_headers_rio, sdsempty()); + + if (rdbSaveType(&aux_save_headers_rio, RDB_OPCODE_MODULE_AUX) == -1) goto error; /* Write the "module" identifier as prefix, so that we'll be able * to call the right module during loading. */ - retval = rdbSaveLen(rdb,mt->id); - if (retval == -1) return -1; - io.bytes += retval; + if (rdbSaveLen(&aux_save_headers_rio,mt->id) == -1) goto error; /* write the 'when' so that we can provide it on loading. add a UINT opcode * for backwards compatibility, everything after the MT needs to be prefixed * by an opcode. */ - retval = rdbSaveLen(rdb,RDB_MODULE_OPCODE_UINT); - if (retval == -1) return -1; - io.bytes += retval; - retval = rdbSaveLen(rdb,when); - if (retval == -1) return -1; - io.bytes += retval; + if (rdbSaveLen(&aux_save_headers_rio,RDB_MODULE_OPCODE_UINT) == -1) goto error; + if (rdbSaveLen(&aux_save_headers_rio,when) == -1) goto error; /* Then write the module-specific representation + EOF marker. */ - mt->aux_save(&io,when); + if (mt->aux_save2) { + io.pre_flush_buffer = aux_save_headers_rio.io.buffer.ptr; + mt->aux_save2(&io,when); + if (io.pre_flush_buffer) { + /* aux_save did not save any data to the RDB. + * We will avoid saving any data related to this aux type + * to allow loading this RDB if the module is not present. */ + sdsfree(io.pre_flush_buffer); + io.pre_flush_buffer = NULL; + return 0; + } + } else { + /* Write headers now, aux_save does not do lazy saving of the headers. */ + retval = rdbWriteRaw(rdb, aux_save_headers_rio.io.buffer.ptr, sdslen(aux_save_headers_rio.io.buffer.ptr)); + if (retval == -1) goto error; + io.bytes += retval; + sdsfree(aux_save_headers_rio.io.buffer.ptr); + mt->aux_save(&io,when); + } retval = rdbSaveLen(rdb,RDB_MODULE_OPCODE_EOF); + serverAssert(!io.pre_flush_buffer); if (retval == -1) io.error = 1; else @@ -1222,6 +1241,9 @@ ssize_t rdbSaveSingleModuleAux(rio *rdb, int when, moduleType *mt) { if (io.error) return -1; return io.bytes; +error: + sdsfree(aux_save_headers_rio.io.buffer.ptr); + return -1; } ssize_t rdbSaveFunctions(rio *rdb) { diff --git a/src/rdb.h b/src/rdb.h index 010c4d7df..d215334db 100644 --- a/src/rdb.h +++ b/src/rdb.h @@ -139,6 +139,7 @@ #define RDB_LOAD_ERR_EMPTY_KEY 1 /* Error of empty key */ #define RDB_LOAD_ERR_OTHER 2 /* Any other errors */ +ssize_t rdbWriteRaw(rio *rdb, void *p, size_t len); int rdbSaveType(rio *rdb, unsigned char type); int rdbLoadType(rio *rdb); time_t rdbLoadTime(rio *rdb); diff --git a/src/redismodule.h b/src/redismodule.h index 90a79a97c..a2256e365 100644 --- a/src/redismodule.h +++ b/src/redismodule.h @@ -17,7 +17,7 @@ /* Version of the RedisModuleTypeMethods structure. Once the RedisModuleTypeMethods * structure is changed, this version number needs to be changed synchronistically. */ -#define REDISMODULE_TYPE_METHOD_VERSION 4 +#define REDISMODULE_TYPE_METHOD_VERSION 5 /* API flags and constants */ #define REDISMODULE_READ (1<<0) @@ -881,6 +881,7 @@ typedef struct RedisModuleTypeMethods { RedisModuleTypeFreeEffortFunc2 free_effort2; RedisModuleTypeUnlinkFunc2 unlink2; RedisModuleTypeCopyFunc2 copy2; + RedisModuleTypeAuxSaveFunc aux_save2; } RedisModuleTypeMethods; #define REDISMODULE_GET_API(name) \ diff --git a/src/server.h b/src/server.h index 4d6eb47ba..d5c0b4917 100644 --- a/src/server.h +++ b/src/server.h @@ -729,6 +729,7 @@ typedef struct RedisModuleType { moduleTypeFreeEffortFunc2 free_effort2; moduleTypeUnlinkFunc2 unlink2; moduleTypeCopyFunc2 copy2; + moduleTypeAuxSaveFunc aux_save2; int aux_save_triggers; char name[10]; /* 9 bytes name + null term. Charset: A-Z a-z 0-9 _- */ } moduleType; @@ -786,6 +787,8 @@ struct RedisModuleIO { struct RedisModuleCtx *ctx; /* Optional context, see RM_GetContextFromIO()*/ struct redisObject *key; /* Optional name of key processed */ int dbid; /* The dbid of the key being processed, -1 when unknown. */ + sds pre_flush_buffer; /* A buffer that should be flushed before next write operation + * See rdbSaveSingleModuleAux for more details */ }; /* Macro to initialize an IO context. Note that the 'ver' field is populated @@ -798,6 +801,7 @@ struct RedisModuleIO { iovar.key = keyptr; \ iovar.dbid = db; \ iovar.ctx = NULL; \ + iovar.pre_flush_buffer = NULL; \ } while(0) /* This is a structure used to export DEBUG DIGEST capabilities to Redis diff --git a/tests/modules/testrdb.c b/tests/modules/testrdb.c index 6f2ca2a14..c31aebb10 100644 --- a/tests/modules/testrdb.c +++ b/tests/modules/testrdb.c @@ -4,6 +4,11 @@ #include /* Module configuration, save aux or not? */ +#define CONF_AUX_OPTION_NO_AUX 0 +#define CONF_AUX_OPTION_SAVE2 1 << 0 +#define CONF_AUX_OPTION_BEFORE_KEYSPACE 1 << 1 +#define CONF_AUX_OPTION_AFTER_KEYSPACE 1 << 2 +#define CONF_AUX_OPTION_NO_DATA 1 << 3 long long conf_aux_count = 0; /* Registered type */ @@ -21,6 +26,8 @@ RedisModuleString *after_str_temp = NULL; * We control this value from RedisModuleEvent_ReplAsyncLoad events. */ int async_loading = 0; +int n_aux_load_called = 0; + void replAsyncLoadCallback(RedisModuleCtx *ctx, RedisModuleEvent e, uint64_t sub, void *data) { REDISMODULE_NOT_USED(e); @@ -88,8 +95,9 @@ void testrdb_type_save(RedisModuleIO *rdb, void *value) { } void testrdb_aux_save(RedisModuleIO *rdb, int when) { - if (conf_aux_count==1) assert(when == REDISMODULE_AUX_AFTER_RDB); - if (conf_aux_count==0) assert(0); + if (!(conf_aux_count & CONF_AUX_OPTION_BEFORE_KEYSPACE)) assert(when == REDISMODULE_AUX_AFTER_RDB); + if (!(conf_aux_count & CONF_AUX_OPTION_AFTER_KEYSPACE)) assert(when == REDISMODULE_AUX_BEFORE_RDB); + assert(conf_aux_count!=CONF_AUX_OPTION_NO_AUX); if (when == REDISMODULE_AUX_BEFORE_RDB) { if (before_str) { RedisModule_SaveSigned(rdb, 1); @@ -109,8 +117,9 @@ void testrdb_aux_save(RedisModuleIO *rdb, int when) { int testrdb_aux_load(RedisModuleIO *rdb, int encver, int when) { assert(encver == 1); - if (conf_aux_count==1) assert(when == REDISMODULE_AUX_AFTER_RDB); - if (conf_aux_count==0) assert(0); + if (!(conf_aux_count & CONF_AUX_OPTION_BEFORE_KEYSPACE)) assert(when == REDISMODULE_AUX_AFTER_RDB); + if (!(conf_aux_count & CONF_AUX_OPTION_AFTER_KEYSPACE)) assert(when == REDISMODULE_AUX_BEFORE_RDB); + assert(conf_aux_count!=CONF_AUX_OPTION_NO_AUX); RedisModuleCtx *ctx = RedisModule_GetContextFromIO(rdb); if (when == REDISMODULE_AUX_BEFORE_RDB) { if (async_loading == 0) { @@ -269,6 +278,28 @@ int testrdb_get_key(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) return REDISMODULE_OK; } +int testrdb_get_n_aux_load_called(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) +{ + REDISMODULE_NOT_USED(ctx); + REDISMODULE_NOT_USED(argv); + REDISMODULE_NOT_USED(argc); + RedisModule_ReplyWithLongLong(ctx, n_aux_load_called); + return REDISMODULE_OK; +} + +int test2rdb_aux_load(RedisModuleIO *rdb, int encver, int when) { + REDISMODULE_NOT_USED(rdb); + REDISMODULE_NOT_USED(encver); + REDISMODULE_NOT_USED(when); + n_aux_load_called++; + return REDISMODULE_OK; +} + +void test2rdb_aux_save(RedisModuleIO *rdb, int when) { + REDISMODULE_NOT_USED(rdb); + REDISMODULE_NOT_USED(when); +} + int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { REDISMODULE_NOT_USED(argv); REDISMODULE_NOT_USED(argc); @@ -280,7 +311,7 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) if (argc > 0) RedisModule_StringToLongLong(argv[0], &conf_aux_count); - if (conf_aux_count==0) { + if (conf_aux_count==CONF_AUX_OPTION_NO_AUX) { RedisModuleTypeMethods datatype_methods = { .version = 1, .rdb_load = testrdb_type_load, @@ -293,7 +324,7 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) testrdb_type = RedisModule_CreateDataType(ctx, "test__rdb", 1, &datatype_methods); if (testrdb_type == NULL) return REDISMODULE_ERR; - } else { + } else if (!(conf_aux_count & CONF_AUX_OPTION_NO_DATA)) { RedisModuleTypeMethods datatype_methods = { .version = REDISMODULE_TYPE_METHOD_VERSION, .rdb_load = testrdb_type_load, @@ -303,14 +334,32 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) .free = testrdb_type_free, .aux_load = testrdb_aux_load, .aux_save = testrdb_aux_save, - .aux_save_triggers = (conf_aux_count == 1 ? - REDISMODULE_AUX_AFTER_RDB : - REDISMODULE_AUX_BEFORE_RDB | REDISMODULE_AUX_AFTER_RDB) + .aux_save_triggers = ((conf_aux_count & CONF_AUX_OPTION_BEFORE_KEYSPACE) ? REDISMODULE_AUX_BEFORE_RDB : 0) | + ((conf_aux_count & CONF_AUX_OPTION_AFTER_KEYSPACE) ? REDISMODULE_AUX_AFTER_RDB : 0) }; + if (conf_aux_count & CONF_AUX_OPTION_SAVE2) { + datatype_methods.aux_save2 = testrdb_aux_save; + } + testrdb_type = RedisModule_CreateDataType(ctx, "test__rdb", 1, &datatype_methods); if (testrdb_type == NULL) return REDISMODULE_ERR; + } else { + + /* Used to verify that aux_save2 api without any data, saves nothing to the RDB. */ + RedisModuleTypeMethods datatype_methods = { + .version = REDISMODULE_TYPE_METHOD_VERSION, + .aux_load = test2rdb_aux_load, + .aux_save = test2rdb_aux_save, + .aux_save_triggers = ((conf_aux_count & CONF_AUX_OPTION_BEFORE_KEYSPACE) ? REDISMODULE_AUX_BEFORE_RDB : 0) | + ((conf_aux_count & CONF_AUX_OPTION_AFTER_KEYSPACE) ? REDISMODULE_AUX_AFTER_RDB : 0) + }; + if (conf_aux_count & CONF_AUX_OPTION_SAVE2) { + datatype_methods.aux_save2 = test2rdb_aux_save; + } + + RedisModule_CreateDataType(ctx, "test__rdb", 1, &datatype_methods); } if (RedisModule_CreateCommand(ctx,"testrdb.set.before", testrdb_set_before,"deny-oom",0,0,0) == REDISMODULE_ERR) @@ -334,6 +383,9 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) if (RedisModule_CreateCommand(ctx,"testrdb.get.key", testrdb_get_key,"",1,1,1) == REDISMODULE_ERR) return REDISMODULE_ERR; + if (RedisModule_CreateCommand(ctx,"testrdb.get.n_aux_load_called", testrdb_get_n_aux_load_called,"",1,1,1) == REDISMODULE_ERR) + return REDISMODULE_ERR; + RedisModule_SubscribeToServerEvent(ctx, RedisModuleEvent_ReplAsyncLoad, replAsyncLoadCallback); diff --git a/tests/unit/moduleapi/testrdb.tcl b/tests/unit/moduleapi/testrdb.tcl index a01bcb30b..2545a8ad2 100644 --- a/tests/unit/moduleapi/testrdb.tcl +++ b/tests/unit/moduleapi/testrdb.tcl @@ -1,3 +1,9 @@ +# This module can be configure with multiple options given as flags on module load time +# 0 - not aux fields will be declared (this is the default) +# 1 << 0 - use aux_save2 api +# 1 << 1 - call aux callback before key space +# 1 << 2 - call aux callback after key space +# 1 << 3 - do not save data on aux callback set testmodule [file normalize tests/modules/testrdb.so] tags "modules" { @@ -21,31 +27,68 @@ tags "modules" { } } + test {aux that saves no data are not saved to the rdb when aux_save2 is used} { + set server_path [tmpdir "server.module-testrdb"] + puts $server_path + # 15 == 1111 - use aux_save2 before and after key space without data + start_server [list overrides [list loadmodule "$testmodule 15" "dir" $server_path] keep_persistence true] { + r set x 1 + r save + } + start_server [list overrides [list "dir" $server_path] keep_persistence true] { + # make sure server started successfully without the module. + assert_equal {1} [r get x] + } + } + + test {aux that saves no data are saved to the rdb when aux_save is used} { + set server_path [tmpdir "server.module-testrdb"] + puts $server_path + # 14 == 1110 - use aux_save before and after key space without data + start_server [list overrides [list loadmodule "$testmodule 14" "dir" $server_path] keep_persistence true] { + r set x 1 + r save + } + start_server [list overrides [list loadmodule "$testmodule 14" "dir" $server_path] keep_persistence true] { + # make sure server started successfully and aux_save was called twice. + assert_equal {1} [r get x] + assert_equal {2} [r testrdb.get.n_aux_load_called] + } + } + + foreach test_case {6 7} { + # 6 == 0110 - use aux_save before and after key space with data + # 7 == 0111 - use aux_save2 before and after key space with data test {modules are able to persist globals before and after} { set server_path [tmpdir "server.module-testrdb"] - start_server [list overrides [list loadmodule "$testmodule 2" "dir" $server_path] keep_persistence true] { + start_server [list overrides [list loadmodule "$testmodule $test_case" "dir" $server_path] keep_persistence true] { r testrdb.set.before global1 r testrdb.set.after global2 assert_equal "global1" [r testrdb.get.before] assert_equal "global2" [r testrdb.get.after] } - start_server [list overrides [list loadmodule "$testmodule 2" "dir" $server_path]] { + start_server [list overrides [list loadmodule "$testmodule $test_case" "dir" $server_path]] { assert_equal "global1" [r testrdb.get.before] assert_equal "global2" [r testrdb.get.after] } } + } + foreach test_case {4 5} { + # 4 == 0100 - use aux_save after key space with data + # 5 == 0101 - use aux_save2 after key space with data test {modules are able to persist globals just after} { set server_path [tmpdir "server.module-testrdb"] - start_server [list overrides [list loadmodule "$testmodule 1" "dir" $server_path] keep_persistence true] { + start_server [list overrides [list loadmodule "$testmodule $test_case" "dir" $server_path] keep_persistence true] { r testrdb.set.after global2 assert_equal "global2" [r testrdb.get.after] } - start_server [list overrides [list loadmodule "$testmodule 1" "dir" $server_path]] { + start_server [list overrides [list loadmodule "$testmodule $test_case" "dir" $server_path]] { assert_equal "global2" [r testrdb.get.after] } } + } test {Verify module options info} { start_server [list overrides [list loadmodule "$testmodule"]] { @@ -142,13 +185,16 @@ tags "modules" { } # Module events for diskless load swapdb when async_loading (matching master replid) + foreach test_case {6 7} { + # 6 == 0110 - use aux_save before and after key space with data + # 7 == 0111 - use aux_save2 before and after key space with data foreach testType {Successful Aborted} { - start_server [list overrides [list loadmodule "$testmodule 2"] tags [list external:skip]] { + start_server [list overrides [list loadmodule "$testmodule $test_case"] tags [list external:skip]] { set replica [srv 0 client] set replica_host [srv 0 host] set replica_port [srv 0 port] set replica_log [srv 0 stdout] - start_server [list overrides [list loadmodule "$testmodule 2"]] { + start_server [list overrides [list loadmodule "$testmodule $test_case"]] { set master [srv 0 client] set master_host [srv 0 host] set master_port [srv 0 port] @@ -255,5 +301,6 @@ tags "modules" { } } } + } } }