From d7d028a7a72388cf3908a5f40c8188e68a447dee Mon Sep 17 00:00:00 2001 From: Oran Agra Date: Sun, 14 Jul 2019 17:34:13 +0300 Subject: [PATCH 1/4] Allow modules to handle RDB loading errors. This is especially needed in diskless loading, were a short read could have caused redis to exit. now the module can handle the error and return to the caller gracefully. this fixes #5326 --- src/module.c | 41 +++++++++++++++++++++++++++++++++++------ src/redismodule.h | 9 +++++++++ src/server.h | 2 ++ 3 files changed, 46 insertions(+), 6 deletions(-) diff --git a/src/module.c b/src/module.c index f4f753c00..68952b1e0 100644 --- a/src/module.c +++ b/src/module.c @@ -3139,9 +3139,14 @@ void *RM_ModuleTypeGetValue(RedisModuleKey *key) { * RDB loading and saving functions * -------------------------------------------------------------------------- */ -/* Called when there is a load error in the context of a module. This cannot - * be recovered like for the built-in types. */ +/* Called when there is a load error in the context of a module. On some + * modules this cannot be recovered, but if the module declared capability + * to handle errors, we'll raise a flag rather than exiting. */ void moduleRDBLoadError(RedisModuleIO *io) { + if (io->flags & REDISMODULE_HANDLE_IO_ERRORS) { + io->error = 1; + return; + } serverLog(LL_WARNING, "Error loading data from RDB (short read or EOF). " "Read performed by module '%s' about type '%s' " @@ -3152,6 +3157,23 @@ void moduleRDBLoadError(RedisModuleIO *io) { exit(1); } +/* Set flags defining capabilities or behavior */ +void RM_SetIOFlags(RedisModuleIO *io, int flags) { + io->flags = flags; +} + +/* Get flags which were set by RedisModule_SetIOFlags */ +int RM_GetIOFlags(RedisModuleIO *io) { + return io->flags; +} + +/* Returns true if any previous IO API failed. + * for Load* APIs the REDISMODULE_HANDLE_IO_ERRORS flag must be set with + * RediModule_SetIOFlags first. */ +int RM_IsIOError(RedisModuleIO *io) { + return io->error; +} + /* Save an unsigned 64 bit value into the RDB file. This function should only * be called in the context of the rdb_save method of modules implementing new * data types. */ @@ -3175,6 +3197,7 @@ saveerr: * be called in the context of the rdb_load method of modules implementing * new data types. */ uint64_t RM_LoadUnsigned(RedisModuleIO *io) { + if (io->error) return 0; if (io->ver == 2) { uint64_t opcode = rdbLoadLen(io->rio,NULL); if (opcode != RDB_MODULE_OPCODE_UINT) goto loaderr; @@ -3186,7 +3209,7 @@ uint64_t RM_LoadUnsigned(RedisModuleIO *io) { loaderr: moduleRDBLoadError(io); - return 0; /* Never reached. */ + return 0; } /* Like RedisModule_SaveUnsigned() but for signed 64 bit values. */ @@ -3245,6 +3268,7 @@ saveerr: /* Implements RM_LoadString() and RM_LoadStringBuffer() */ void *moduleLoadString(RedisModuleIO *io, int plain, size_t *lenptr) { + if (io->error) return NULL; if (io->ver == 2) { uint64_t opcode = rdbLoadLen(io->rio,NULL); if (opcode != RDB_MODULE_OPCODE_STRING) goto loaderr; @@ -3256,7 +3280,7 @@ void *moduleLoadString(RedisModuleIO *io, int plain, size_t *lenptr) { loaderr: moduleRDBLoadError(io); - return NULL; /* Never reached. */ + return NULL; } /* In the context of the rdb_load method of a module data type, loads a string @@ -3305,6 +3329,7 @@ saveerr: /* In the context of the rdb_save method of a module data type, loads back the * double value saved by RedisModule_SaveDouble(). */ double RM_LoadDouble(RedisModuleIO *io) { + if (io->error) return 0; if (io->ver == 2) { uint64_t opcode = rdbLoadLen(io->rio,NULL); if (opcode != RDB_MODULE_OPCODE_DOUBLE) goto loaderr; @@ -3316,7 +3341,7 @@ double RM_LoadDouble(RedisModuleIO *io) { loaderr: moduleRDBLoadError(io); - return 0; /* Never reached. */ + return 0; } /* In the context of the rdb_save method of a module data type, saves a float @@ -3341,6 +3366,7 @@ saveerr: /* In the context of the rdb_save method of a module data type, loads back the * float value saved by RedisModule_SaveFloat(). */ float RM_LoadFloat(RedisModuleIO *io) { + if (io->error) return 0; if (io->ver == 2) { uint64_t opcode = rdbLoadLen(io->rio,NULL); if (opcode != RDB_MODULE_OPCODE_FLOAT) goto loaderr; @@ -3352,7 +3378,7 @@ float RM_LoadFloat(RedisModuleIO *io) { loaderr: moduleRDBLoadError(io); - return 0; /* Never reached. */ + return 0; } /* -------------------------------------------------------------------------- @@ -5408,6 +5434,9 @@ void moduleRegisterCoreAPI(void) { REGISTER_API(ModuleTypeSetValue); REGISTER_API(ModuleTypeGetType); REGISTER_API(ModuleTypeGetValue); + REGISTER_API(GetIOFlags); + REGISTER_API(SetIOFlags); + REGISTER_API(IsIOError); REGISTER_API(SaveUnsigned); REGISTER_API(LoadUnsigned); REGISTER_API(SaveSigned); diff --git a/src/redismodule.h b/src/redismodule.h index b9c73957b..154b5b405 100644 --- a/src/redismodule.h +++ b/src/redismodule.h @@ -140,6 +140,9 @@ typedef uint64_t RedisModuleTimerID; /* Do filter RedisModule_Call() commands initiated by module itself. */ #define REDISMODULE_CMDFILTER_NOSELF (1<<0) +/* Declare that the module can handle errors with RedisModule_SetIOFlags */ +#define REDISMODULE_HANDLE_IO_ERRORS (1<<0) + /* ------------------------- End of common defines ------------------------ */ #ifndef REDISMODULE_CORE @@ -271,6 +274,9 @@ RedisModuleType *REDISMODULE_API_FUNC(RedisModule_CreateDataType)(RedisModuleCtx int REDISMODULE_API_FUNC(RedisModule_ModuleTypeSetValue)(RedisModuleKey *key, RedisModuleType *mt, void *value); RedisModuleType *REDISMODULE_API_FUNC(RedisModule_ModuleTypeGetType)(RedisModuleKey *key); void *REDISMODULE_API_FUNC(RedisModule_ModuleTypeGetValue)(RedisModuleKey *key); +void REDISMODULE_API_FUNC(RedisModule_SetIOFlags)(RedisModuleIO *io, int flags); +int REDISMODULE_API_FUNC(RedisModule_GetIOFlags)(RedisModuleIO *io); +int REDISMODULE_API_FUNC(RedisModule_IsIOError)(RedisModuleIO *io); void REDISMODULE_API_FUNC(RedisModule_SaveUnsigned)(RedisModuleIO *io, uint64_t value); uint64_t REDISMODULE_API_FUNC(RedisModule_LoadUnsigned)(RedisModuleIO *io); void REDISMODULE_API_FUNC(RedisModule_SaveSigned)(RedisModuleIO *io, int64_t value); @@ -444,6 +450,9 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int REDISMODULE_GET_API(ModuleTypeSetValue); REDISMODULE_GET_API(ModuleTypeGetType); REDISMODULE_GET_API(ModuleTypeGetValue); + REDISMODULE_GET_API(SetIOFlags); + REDISMODULE_GET_API(GetIOFlags); + REDISMODULE_GET_API(IsIOError); REDISMODULE_GET_API(SaveUnsigned); REDISMODULE_GET_API(LoadUnsigned); REDISMODULE_GET_API(SaveSigned); diff --git a/src/server.h b/src/server.h index f81b1010e..9957c3b5c 100644 --- a/src/server.h +++ b/src/server.h @@ -599,6 +599,7 @@ typedef struct RedisModuleIO { * 2 (current version with opcodes annotation). */ struct RedisModuleCtx *ctx; /* Optional context, see RM_GetContextFromIO()*/ struct redisObject *key; /* Optional name of key processed */ + int flags; /* flags declaring capabilities or behavior */ } RedisModuleIO; /* Macro to initialize an IO context. Note that the 'ver' field is populated @@ -611,6 +612,7 @@ typedef struct RedisModuleIO { iovar.ver = 0; \ iovar.key = keyptr; \ iovar.ctx = NULL; \ + iovar.flags = 0; \ } while(0); /* This is a structure used to export DEBUG DIGEST capabilities to Redis From 4339706e07e15fe5a228d175756c4e4be83e2867 Mon Sep 17 00:00:00 2001 From: Oran Agra Date: Tue, 30 Jul 2019 15:11:57 +0300 Subject: [PATCH 2/4] Avoid diskelss-load if modules did not declare they handle read errors --- src/module.c | 47 +++++++++++++++++++++++++++++++++++------------ src/redismodule.h | 10 ++++------ src/replication.c | 6 +++++- src/server.h | 3 +-- 4 files changed, 45 insertions(+), 21 deletions(-) diff --git a/src/module.c b/src/module.c index 68952b1e0..36384c018 100644 --- a/src/module.c +++ b/src/module.c @@ -51,6 +51,7 @@ struct RedisModule { list *using; /* List of modules we use some APIs of. */ list *filters; /* List of filters the module has registered. */ int in_call; /* RM_Call() nesting level */ + int options; /* Moduile options and capabilities. */ }; typedef struct RedisModule RedisModule; @@ -771,6 +772,19 @@ long long RM_Milliseconds(void) { return mstime(); } +/* Set flags defining capabilities or behavior bit flags. + * + * REDISMODULE_OPTIONS_HANDLE_IO_ERRORS: + * Generally, modules don't need to bother with this, as the process will just + * terminate if a read error happens, however, setting this flag would allow + * repl-diskless-load to work if enabled. + * The module should use RedisModule_IsIOError after reads, before using the + * data that was read, and in case of error, propagate it upwards, and also be + * able to release the partially populated value and all it's allocations. */ +void RM_SetModuleOptions(RedisModuleCtx *ctx, int options) { + ctx->module->options = options; +} + /* -------------------------------------------------------------------------- * Automatic memory management for modules * -------------------------------------------------------------------------- */ @@ -3143,7 +3157,7 @@ void *RM_ModuleTypeGetValue(RedisModuleKey *key) { * modules this cannot be recovered, but if the module declared capability * to handle errors, we'll raise a flag rather than exiting. */ void moduleRDBLoadError(RedisModuleIO *io) { - if (io->flags & REDISMODULE_HANDLE_IO_ERRORS) { + if (io->ctx->module->options & REDISMODULE_OPTIONS_HANDLE_IO_ERRORS) { io->error = 1; return; } @@ -3157,19 +3171,29 @@ void moduleRDBLoadError(RedisModuleIO *io) { exit(1); } -/* Set flags defining capabilities or behavior */ -void RM_SetIOFlags(RedisModuleIO *io, int flags) { - io->flags = flags; -} +/* Returns 0 if there's at least one registered data type that did not declare + * REDISMODULE_OPTIONS_HANDLE_IO_ERRORS, in which case diskless loading should + * be avoided since it could cause data loss. */ +int moduleAllDatatypesHandleErrors() { + dictIterator *di = dictGetIterator(modules); + dictEntry *de; -/* Get flags which were set by RedisModule_SetIOFlags */ -int RM_GetIOFlags(RedisModuleIO *io) { - return io->flags; + while ((de = dictNext(di)) != NULL) { + struct RedisModule *module = dictGetVal(de); + if (listLength(module->types) && + !(module->options & REDISMODULE_OPTIONS_HANDLE_IO_ERRORS)) + { + dictReleaseIterator(di); + return 0; + } + } + dictReleaseIterator(di); + return 1; } /* Returns true if any previous IO API failed. - * for Load* APIs the REDISMODULE_HANDLE_IO_ERRORS flag must be set with - * RediModule_SetIOFlags first. */ + * for Load* APIs the REDISMODULE_OPTIONS_HANDLE_IO_ERRORS flag must be set with + * RediModule_SetModuleOptions first. */ int RM_IsIOError(RedisModuleIO *io) { return io->error; } @@ -5434,9 +5458,8 @@ void moduleRegisterCoreAPI(void) { REGISTER_API(ModuleTypeSetValue); REGISTER_API(ModuleTypeGetType); REGISTER_API(ModuleTypeGetValue); - REGISTER_API(GetIOFlags); - REGISTER_API(SetIOFlags); REGISTER_API(IsIOError); + REGISTER_API(SetModuleOptions); REGISTER_API(SaveUnsigned); REGISTER_API(LoadUnsigned); REGISTER_API(SaveSigned); diff --git a/src/redismodule.h b/src/redismodule.h index 154b5b405..f0f27c067 100644 --- a/src/redismodule.h +++ b/src/redismodule.h @@ -140,8 +140,8 @@ typedef uint64_t RedisModuleTimerID; /* Do filter RedisModule_Call() commands initiated by module itself. */ #define REDISMODULE_CMDFILTER_NOSELF (1<<0) -/* Declare that the module can handle errors with RedisModule_SetIOFlags */ -#define REDISMODULE_HANDLE_IO_ERRORS (1<<0) +/* Declare that the module can handle errors with RedisModule_SetModuleOptions. */ +#define REDISMODULE_OPTIONS_HANDLE_IO_ERRORS (1<<0) /* ------------------------- End of common defines ------------------------ */ @@ -274,9 +274,8 @@ RedisModuleType *REDISMODULE_API_FUNC(RedisModule_CreateDataType)(RedisModuleCtx int REDISMODULE_API_FUNC(RedisModule_ModuleTypeSetValue)(RedisModuleKey *key, RedisModuleType *mt, void *value); RedisModuleType *REDISMODULE_API_FUNC(RedisModule_ModuleTypeGetType)(RedisModuleKey *key); void *REDISMODULE_API_FUNC(RedisModule_ModuleTypeGetValue)(RedisModuleKey *key); -void REDISMODULE_API_FUNC(RedisModule_SetIOFlags)(RedisModuleIO *io, int flags); -int REDISMODULE_API_FUNC(RedisModule_GetIOFlags)(RedisModuleIO *io); int REDISMODULE_API_FUNC(RedisModule_IsIOError)(RedisModuleIO *io); +void REDISMODULE_API_FUNC(RedisModule_SetModuleOptions)(RedisModuleCtx *ctx, int options); void REDISMODULE_API_FUNC(RedisModule_SaveUnsigned)(RedisModuleIO *io, uint64_t value); uint64_t REDISMODULE_API_FUNC(RedisModule_LoadUnsigned)(RedisModuleIO *io); void REDISMODULE_API_FUNC(RedisModule_SaveSigned)(RedisModuleIO *io, int64_t value); @@ -450,9 +449,8 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int REDISMODULE_GET_API(ModuleTypeSetValue); REDISMODULE_GET_API(ModuleTypeGetType); REDISMODULE_GET_API(ModuleTypeGetValue); - REDISMODULE_GET_API(SetIOFlags); - REDISMODULE_GET_API(GetIOFlags); REDISMODULE_GET_API(IsIOError); + REDISMODULE_GET_API(SetModuleOptions); REDISMODULE_GET_API(SaveUnsigned); REDISMODULE_GET_API(LoadUnsigned); REDISMODULE_GET_API(SaveSigned); diff --git a/src/replication.c b/src/replication.c index 26e7cf8f0..14ae2f96c 100644 --- a/src/replication.c +++ b/src/replication.c @@ -1115,8 +1115,12 @@ void restartAOFAfterSYNC() { static int useDisklessLoad() { /* compute boolean decision to use diskless load */ - return server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB || + int enabled = server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB || (server.repl_diskless_load == REPL_DISKLESS_LOAD_WHEN_DB_EMPTY && dbTotalServerKeyCount()==0); + /* Check all modules handle read errors, otherwise it's not safe to use diskless load. */ + if (enabled && !moduleAllDatatypesHandleErrors()) + enabled = 0; + return enabled; } /* Helper function for readSyncBulkPayload() to make backups of the current diff --git a/src/server.h b/src/server.h index 9957c3b5c..5991cfa6c 100644 --- a/src/server.h +++ b/src/server.h @@ -599,7 +599,6 @@ typedef struct RedisModuleIO { * 2 (current version with opcodes annotation). */ struct RedisModuleCtx *ctx; /* Optional context, see RM_GetContextFromIO()*/ struct redisObject *key; /* Optional name of key processed */ - int flags; /* flags declaring capabilities or behavior */ } RedisModuleIO; /* Macro to initialize an IO context. Note that the 'ver' field is populated @@ -612,7 +611,6 @@ typedef struct RedisModuleIO { iovar.ver = 0; \ iovar.key = keyptr; \ iovar.ctx = NULL; \ - iovar.flags = 0; \ } while(0); /* This is a structure used to export DEBUG DIGEST capabilities to Redis @@ -1530,6 +1528,7 @@ void moduleAcquireGIL(void); void moduleReleaseGIL(void); void moduleNotifyKeyspaceEvent(int type, const char *event, robj *key, int dbid); void moduleCallCommandFilters(client *c); +int moduleAllDatatypesHandleErrors(); /* Utils */ long long ustime(void); From 07b1abab9543205de4ed2db7bbf28d094a08960b Mon Sep 17 00:00:00 2001 From: Oran Agra Date: Sun, 21 Jul 2019 18:18:11 +0300 Subject: [PATCH 3/4] Add test for module diskless short reads --- tests/modules/testrdb.c | 13 ++++++- tests/unit/moduleapi/testrdb.tcl | 62 +++++++++++++++++++++++++++++++- 2 files changed, 73 insertions(+), 2 deletions(-) diff --git a/tests/modules/testrdb.c b/tests/modules/testrdb.c index 415497a2f..d73c8bfd3 100644 --- a/tests/modules/testrdb.c +++ b/tests/modules/testrdb.c @@ -15,6 +15,8 @@ RedisModuleString *after_str = NULL; void *testrdb_type_load(RedisModuleIO *rdb, int encver) { int count = RedisModule_LoadSigned(rdb); + if (RedisModule_IsIOError(rdb)) + return NULL; assert(count==1); assert(encver==1); RedisModuleString *str = RedisModule_LoadString(rdb); @@ -57,6 +59,8 @@ int testrdb_aux_load(RedisModuleIO *rdb, int encver, int when) { RedisModule_FreeString(ctx, before_str); before_str = NULL; int count = RedisModule_LoadSigned(rdb); + if (RedisModule_IsIOError(rdb)) + return REDISMODULE_ERR; if (count) before_str = RedisModule_LoadString(rdb); } else { @@ -64,14 +68,19 @@ int testrdb_aux_load(RedisModuleIO *rdb, int encver, int when) { RedisModule_FreeString(ctx, after_str); after_str = NULL; int count = RedisModule_LoadSigned(rdb); + if (RedisModule_IsIOError(rdb)) + return REDISMODULE_ERR; if (count) after_str = RedisModule_LoadString(rdb); } + if (RedisModule_IsIOError(rdb)) + return REDISMODULE_ERR; return REDISMODULE_OK; } void testrdb_type_free(void *value) { - RedisModule_FreeString(NULL, (RedisModuleString*)value); + if (value) + RedisModule_FreeString(NULL, (RedisModuleString*)value); } int testrdb_set_before(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) @@ -171,6 +180,8 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) if (RedisModule_Init(ctx,"testrdb",1,REDISMODULE_APIVER_1) == REDISMODULE_ERR) return REDISMODULE_ERR; + RedisModule_SetModuleOptions(ctx, REDISMODULE_OPTIONS_HANDLE_IO_ERRORS); + if (argc > 0) RedisModule_StringToLongLong(argv[0], &conf_aux_count); diff --git a/tests/unit/moduleapi/testrdb.tcl b/tests/unit/moduleapi/testrdb.tcl index 22201a08e..c72570002 100644 --- a/tests/unit/moduleapi/testrdb.tcl +++ b/tests/unit/moduleapi/testrdb.tcl @@ -56,7 +56,67 @@ tags "modules" { } } + tags {repl} { + test {diskless loading short read with module} { + start_server [list overrides [list loadmodule "$testmodule"]] { + set replica [srv 0 client] + set replica_host [srv 0 host] + set replica_port [srv 0 port] + start_server [list overrides [list loadmodule "$testmodule"]] { + set master [srv 0 client] + set master_host [srv 0 host] + set master_port [srv 0 port] - # TODO: test short read handling + # Set master and replica to use diskless replication + $master config set repl-diskless-sync yes + $master config set rdbcompression no + $replica config set repl-diskless-load swapdb + for {set k 0} {$k < 30} {incr k} { + r testrdb.set.key key$k [string repeat A [expr {int(rand()*1000000)}]] + } + # Start the replication process... + $master config set repl-diskless-sync-delay 0 + $replica replicaof $master_host $master_port + + # kill the replication at various points + set attempts 3 + if {$::accurate} { set attempts 10 } + for {set i 0} {$i < $attempts} {incr i} { + # wait for the replica to start reading the rdb + # using the log file since the replica only responds to INFO once in 2mb + wait_for_log_message -1 "*Loading DB in memory*" 5 2000 1 + + # add some additional random sleep so that we kill the master on a different place each time + after [expr {int(rand()*100)}] + + # kill the replica connection on the master + set killed [$master client kill type replica] + + if {[catch { + set res [wait_for_log_message -1 "*Internal error in RDB*" 5 100 10] + if {$::verbose} { + puts $res + } + }]} { + puts "failed triggering short read" + # force the replica to try another full sync + $master client kill type replica + $master set asdf asdf + # the side effect of resizing the backlog is that it is flushed (16k is the min size) + $master config set repl-backlog-size [expr {16384 + $i}] + } + # wait for loading to stop (fail) + wait_for_condition 100 10 { + [s -1 loading] eq 0 + } else { + fail "Replica didn't disconnect" + } + } + # enable fast shutdown + $master config set rdb-key-save-delay 0 + } + } + } + } } From 40c4183196b8c6e1c44c27400965786b9f010c74 Mon Sep 17 00:00:00 2001 From: Oran Agra Date: Tue, 30 Jul 2019 16:32:58 +0300 Subject: [PATCH 4/4] Log message when modules prevent diskless-load --- src/replication.c | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/replication.c b/src/replication.c index 14ae2f96c..614aaec83 100644 --- a/src/replication.c +++ b/src/replication.c @@ -1118,8 +1118,11 @@ static int useDisklessLoad() { int enabled = server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB || (server.repl_diskless_load == REPL_DISKLESS_LOAD_WHEN_DB_EMPTY && dbTotalServerKeyCount()==0); /* Check all modules handle read errors, otherwise it's not safe to use diskless load. */ - if (enabled && !moduleAllDatatypesHandleErrors()) + if (enabled && !moduleAllDatatypesHandleErrors()) { + serverLog(LL_WARNING, + "Skipping diskless-load because there are modules that don't handle read errors."); enabled = 0; + } return enabled; }