diff --git a/src/module.c b/src/module.c index 3074b2b19..a9db0a3a5 100644 --- a/src/module.c +++ b/src/module.c @@ -52,6 +52,7 @@ struct RedisModule { list *using; /* List of modules we use some APIs of. */ list *filters; /* List of filters the module has registered. */ int in_call; /* RM_Call() nesting level */ + int options; /* Moduile options and capabilities. */ }; typedef struct RedisModule RedisModule; @@ -772,6 +773,19 @@ long long RM_Milliseconds(void) { return mstime(); } +/* Set flags defining capabilities or behavior bit flags. + * + * REDISMODULE_OPTIONS_HANDLE_IO_ERRORS: + * Generally, modules don't need to bother with this, as the process will just + * terminate if a read error happens, however, setting this flag would allow + * repl-diskless-load to work if enabled. + * The module should use RedisModule_IsIOError after reads, before using the + * data that was read, and in case of error, propagate it upwards, and also be + * able to release the partially populated value and all it's allocations. */ +void RM_SetModuleOptions(RedisModuleCtx *ctx, int options) { + ctx->module->options = options; +} + /* -------------------------------------------------------------------------- * Automatic memory management for modules * -------------------------------------------------------------------------- */ @@ -3150,9 +3164,14 @@ void *RM_ModuleTypeGetValue(RedisModuleKey *key) { * RDB loading and saving functions * -------------------------------------------------------------------------- */ -/* Called when there is a load error in the context of a module. This cannot - * be recovered like for the built-in types. */ +/* Called when there is a load error in the context of a module. On some + * modules this cannot be recovered, but if the module declared capability + * to handle errors, we'll raise a flag rather than exiting. */ void moduleRDBLoadError(RedisModuleIO *io) { + if (io->ctx->module->options & REDISMODULE_OPTIONS_HANDLE_IO_ERRORS) { + io->error = 1; + return; + } serverLog(LL_WARNING, "Error loading data from RDB (short read or EOF). " "Read performed by module '%s' about type '%s' " @@ -3163,6 +3182,33 @@ void moduleRDBLoadError(RedisModuleIO *io) { exit(1); } +/* Returns 0 if there's at least one registered data type that did not declare + * REDISMODULE_OPTIONS_HANDLE_IO_ERRORS, in which case diskless loading should + * be avoided since it could cause data loss. */ +int moduleAllDatatypesHandleErrors() { + dictIterator *di = dictGetIterator(modules); + dictEntry *de; + + while ((de = dictNext(di)) != NULL) { + struct RedisModule *module = dictGetVal(de); + if (listLength(module->types) && + !(module->options & REDISMODULE_OPTIONS_HANDLE_IO_ERRORS)) + { + dictReleaseIterator(di); + return 0; + } + } + dictReleaseIterator(di); + return 1; +} + +/* Returns true if any previous IO API failed. + * for Load* APIs the REDISMODULE_OPTIONS_HANDLE_IO_ERRORS flag must be set with + * RediModule_SetModuleOptions first. */ +int RM_IsIOError(RedisModuleIO *io) { + return io->error; +} + /* Save an unsigned 64 bit value into the RDB file. This function should only * be called in the context of the rdb_save method of modules implementing new * data types. */ @@ -3186,6 +3232,7 @@ saveerr: * be called in the context of the rdb_load method of modules implementing * new data types. */ uint64_t RM_LoadUnsigned(RedisModuleIO *io) { + if (io->error) return 0; if (io->ver == 2) { uint64_t opcode = rdbLoadLen(io->rio,NULL); if (opcode != RDB_MODULE_OPCODE_UINT) goto loaderr; @@ -3197,7 +3244,7 @@ uint64_t RM_LoadUnsigned(RedisModuleIO *io) { loaderr: moduleRDBLoadError(io); - return 0; /* Never reached. */ + return 0; } /* Like RedisModule_SaveUnsigned() but for signed 64 bit values. */ @@ -3256,6 +3303,7 @@ saveerr: /* Implements RM_LoadString() and RM_LoadStringBuffer() */ void *moduleLoadString(RedisModuleIO *io, int plain, size_t *lenptr) { + if (io->error) return NULL; if (io->ver == 2) { uint64_t opcode = rdbLoadLen(io->rio,NULL); if (opcode != RDB_MODULE_OPCODE_STRING) goto loaderr; @@ -3267,7 +3315,7 @@ void *moduleLoadString(RedisModuleIO *io, int plain, size_t *lenptr) { loaderr: moduleRDBLoadError(io); - return NULL; /* Never reached. */ + return NULL; } /* In the context of the rdb_load method of a module data type, loads a string @@ -3316,6 +3364,7 @@ saveerr: /* In the context of the rdb_save method of a module data type, loads back the * double value saved by RedisModule_SaveDouble(). */ double RM_LoadDouble(RedisModuleIO *io) { + if (io->error) return 0; if (io->ver == 2) { uint64_t opcode = rdbLoadLen(io->rio,NULL); if (opcode != RDB_MODULE_OPCODE_DOUBLE) goto loaderr; @@ -3327,7 +3376,7 @@ double RM_LoadDouble(RedisModuleIO *io) { loaderr: moduleRDBLoadError(io); - return 0; /* Never reached. */ + return 0; } /* In the context of the rdb_save method of a module data type, saves a float @@ -3352,6 +3401,7 @@ saveerr: /* In the context of the rdb_save method of a module data type, loads back the * float value saved by RedisModule_SaveFloat(). */ float RM_LoadFloat(RedisModuleIO *io) { + if (io->error) return 0; if (io->ver == 2) { uint64_t opcode = rdbLoadLen(io->rio,NULL); if (opcode != RDB_MODULE_OPCODE_FLOAT) goto loaderr; @@ -3363,7 +3413,7 @@ float RM_LoadFloat(RedisModuleIO *io) { loaderr: moduleRDBLoadError(io); - return 0; /* Never reached. */ + return 0; } /* Iterate over modules, and trigger rdb aux saving for the ones modules types @@ -5461,6 +5511,8 @@ void moduleRegisterCoreAPI(void) { REGISTER_API(ModuleTypeSetValue); REGISTER_API(ModuleTypeGetType); REGISTER_API(ModuleTypeGetValue); + REGISTER_API(IsIOError); + REGISTER_API(SetModuleOptions); REGISTER_API(SaveUnsigned); REGISTER_API(LoadUnsigned); REGISTER_API(SaveSigned); diff --git a/src/redismodule.h b/src/redismodule.h index 45fcc2091..ad1c6e652 100644 --- a/src/redismodule.h +++ b/src/redismodule.h @@ -144,6 +144,9 @@ typedef uint64_t RedisModuleTimerID; /* Do filter RedisModule_Call() commands initiated by module itself. */ #define REDISMODULE_CMDFILTER_NOSELF (1<<0) +/* Declare that the module can handle errors with RedisModule_SetModuleOptions. */ +#define REDISMODULE_OPTIONS_HANDLE_IO_ERRORS (1<<0) + /* ------------------------- End of common defines ------------------------ */ #ifndef REDISMODULE_CORE @@ -280,6 +283,8 @@ RedisModuleType *REDISMODULE_API_FUNC(RedisModule_CreateDataType)(RedisModuleCtx int REDISMODULE_API_FUNC(RedisModule_ModuleTypeSetValue)(RedisModuleKey *key, RedisModuleType *mt, void *value); RedisModuleType *REDISMODULE_API_FUNC(RedisModule_ModuleTypeGetType)(RedisModuleKey *key); void *REDISMODULE_API_FUNC(RedisModule_ModuleTypeGetValue)(RedisModuleKey *key); +int REDISMODULE_API_FUNC(RedisModule_IsIOError)(RedisModuleIO *io); +void REDISMODULE_API_FUNC(RedisModule_SetModuleOptions)(RedisModuleCtx *ctx, int options); void REDISMODULE_API_FUNC(RedisModule_SaveUnsigned)(RedisModuleIO *io, uint64_t value); uint64_t REDISMODULE_API_FUNC(RedisModule_LoadUnsigned)(RedisModuleIO *io); void REDISMODULE_API_FUNC(RedisModule_SaveSigned)(RedisModuleIO *io, int64_t value); @@ -454,6 +459,8 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int REDISMODULE_GET_API(ModuleTypeSetValue); REDISMODULE_GET_API(ModuleTypeGetType); REDISMODULE_GET_API(ModuleTypeGetValue); + REDISMODULE_GET_API(IsIOError); + REDISMODULE_GET_API(SetModuleOptions); REDISMODULE_GET_API(SaveUnsigned); REDISMODULE_GET_API(LoadUnsigned); REDISMODULE_GET_API(SaveSigned); diff --git a/src/replication.c b/src/replication.c index d6646c9ef..bb4287b62 100644 --- a/src/replication.c +++ b/src/replication.c @@ -1139,8 +1139,15 @@ void restartAOFAfterSYNC() { static int useDisklessLoad() { /* compute boolean decision to use diskless load */ - return server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB || + int enabled = server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB || (server.repl_diskless_load == REPL_DISKLESS_LOAD_WHEN_DB_EMPTY && dbTotalServerKeyCount()==0); + /* Check all modules handle read errors, otherwise it's not safe to use diskless load. */ + if (enabled && !moduleAllDatatypesHandleErrors()) { + serverLog(LL_WARNING, + "Skipping diskless-load because there are modules that don't handle read errors."); + enabled = 0; + } + return enabled; } /* Helper function for readSyncBulkPayload() to make backups of the current diff --git a/src/server.h b/src/server.h index 31d253066..d0d0ece1c 100644 --- a/src/server.h +++ b/src/server.h @@ -1541,6 +1541,7 @@ void moduleReleaseGIL(void); void moduleNotifyKeyspaceEvent(int type, const char *event, robj *key, int dbid); void moduleCallCommandFilters(client *c); ssize_t rdbSaveModulesAux(rio *rdb, int when); +int moduleAllDatatypesHandleErrors(); /* Utils */ long long ustime(void); diff --git a/tests/modules/testrdb.c b/tests/modules/testrdb.c index 415497a2f..d73c8bfd3 100644 --- a/tests/modules/testrdb.c +++ b/tests/modules/testrdb.c @@ -15,6 +15,8 @@ RedisModuleString *after_str = NULL; void *testrdb_type_load(RedisModuleIO *rdb, int encver) { int count = RedisModule_LoadSigned(rdb); + if (RedisModule_IsIOError(rdb)) + return NULL; assert(count==1); assert(encver==1); RedisModuleString *str = RedisModule_LoadString(rdb); @@ -57,6 +59,8 @@ int testrdb_aux_load(RedisModuleIO *rdb, int encver, int when) { RedisModule_FreeString(ctx, before_str); before_str = NULL; int count = RedisModule_LoadSigned(rdb); + if (RedisModule_IsIOError(rdb)) + return REDISMODULE_ERR; if (count) before_str = RedisModule_LoadString(rdb); } else { @@ -64,14 +68,19 @@ int testrdb_aux_load(RedisModuleIO *rdb, int encver, int when) { RedisModule_FreeString(ctx, after_str); after_str = NULL; int count = RedisModule_LoadSigned(rdb); + if (RedisModule_IsIOError(rdb)) + return REDISMODULE_ERR; if (count) after_str = RedisModule_LoadString(rdb); } + if (RedisModule_IsIOError(rdb)) + return REDISMODULE_ERR; return REDISMODULE_OK; } void testrdb_type_free(void *value) { - RedisModule_FreeString(NULL, (RedisModuleString*)value); + if (value) + RedisModule_FreeString(NULL, (RedisModuleString*)value); } int testrdb_set_before(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) @@ -171,6 +180,8 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) if (RedisModule_Init(ctx,"testrdb",1,REDISMODULE_APIVER_1) == REDISMODULE_ERR) return REDISMODULE_ERR; + RedisModule_SetModuleOptions(ctx, REDISMODULE_OPTIONS_HANDLE_IO_ERRORS); + if (argc > 0) RedisModule_StringToLongLong(argv[0], &conf_aux_count); diff --git a/tests/unit/moduleapi/testrdb.tcl b/tests/unit/moduleapi/testrdb.tcl index 22201a08e..c72570002 100644 --- a/tests/unit/moduleapi/testrdb.tcl +++ b/tests/unit/moduleapi/testrdb.tcl @@ -56,7 +56,67 @@ tags "modules" { } } + tags {repl} { + test {diskless loading short read with module} { + start_server [list overrides [list loadmodule "$testmodule"]] { + set replica [srv 0 client] + set replica_host [srv 0 host] + set replica_port [srv 0 port] + start_server [list overrides [list loadmodule "$testmodule"]] { + set master [srv 0 client] + set master_host [srv 0 host] + set master_port [srv 0 port] - # TODO: test short read handling + # Set master and replica to use diskless replication + $master config set repl-diskless-sync yes + $master config set rdbcompression no + $replica config set repl-diskless-load swapdb + for {set k 0} {$k < 30} {incr k} { + r testrdb.set.key key$k [string repeat A [expr {int(rand()*1000000)}]] + } + # Start the replication process... + $master config set repl-diskless-sync-delay 0 + $replica replicaof $master_host $master_port + + # kill the replication at various points + set attempts 3 + if {$::accurate} { set attempts 10 } + for {set i 0} {$i < $attempts} {incr i} { + # wait for the replica to start reading the rdb + # using the log file since the replica only responds to INFO once in 2mb + wait_for_log_message -1 "*Loading DB in memory*" 5 2000 1 + + # add some additional random sleep so that we kill the master on a different place each time + after [expr {int(rand()*100)}] + + # kill the replica connection on the master + set killed [$master client kill type replica] + + if {[catch { + set res [wait_for_log_message -1 "*Internal error in RDB*" 5 100 10] + if {$::verbose} { + puts $res + } + }]} { + puts "failed triggering short read" + # force the replica to try another full sync + $master client kill type replica + $master set asdf asdf + # the side effect of resizing the backlog is that it is flushed (16k is the min size) + $master config set repl-backlog-size [expr {16384 + $i}] + } + # wait for loading to stop (fail) + wait_for_condition 100 10 { + [s -1 loading] eq 0 + } else { + fail "Replica didn't disconnect" + } + } + # enable fast shutdown + $master config set rdb-key-save-delay 0 + } + } + } + } }