From e978bdf9ef62404083afed28d98e7a455bcecd3b Mon Sep 17 00:00:00 2001 From: Oran Agra Date: Wed, 23 Oct 2019 11:53:15 +0300 Subject: [PATCH 1/7] Module API for controlling LRU and LFU, and OpenKey without TOUCH Some commands would want to open a key without touching it's LRU/LFU similarly to the OBJECT or DEBUG command do. Other commands may want to implement logic similar to what RESTORE does (and in the future MIGRATE) and get/set the LRU or LFU. --- src/db.c | 8 ++++++-- src/module.c | 39 +++++++++++++++++++++++++++++++++++++-- src/object.c | 5 ++++- src/redismodule.h | 8 ++++++++ src/server.h | 3 ++- 5 files changed, 57 insertions(+), 6 deletions(-) diff --git a/src/db.c b/src/db.c index 2c0a0cdd3..14f163a8f 100644 --- a/src/db.c +++ b/src/db.c @@ -151,9 +151,13 @@ robj *lookupKeyRead(redisDb *db, robj *key) { * * Returns the linked value object if the key exists or NULL if the key * does not exist in the specified DB. */ -robj *lookupKeyWrite(redisDb *db, robj *key) { +robj *lookupKeyWriteWithFlags(redisDb *db, robj *key, int flags) { expireIfNeeded(db,key); - return lookupKey(db,key,LOOKUP_NONE); + return lookupKey(db,key,flags); +} + +robj *lookupKeyWrite(redisDb *db, robj *key) { + return lookupKeyWriteWithFlags(db, key, LOOKUP_NONE); } robj *lookupKeyReadOrReply(client *c, robj *key, robj *reply) { diff --git a/src/module.c b/src/module.c index 2a1bda879..09cf6ac7a 100644 --- a/src/module.c +++ b/src/module.c @@ -1830,11 +1830,12 @@ int RM_SelectDb(RedisModuleCtx *ctx, int newid) { void *RM_OpenKey(RedisModuleCtx *ctx, robj *keyname, int mode) { RedisModuleKey *kp; robj *value; + int flags = mode & REDISMODULE_OPEN_KEY_NOTOUCH? LOOKUP_NOTOUCH: 0; if (mode & REDISMODULE_WRITE) { - value = lookupKeyWrite(ctx->client->db,keyname); + value = lookupKeyWriteWithFlags(ctx->client->db,keyname, flags); } else { - value = lookupKeyRead(ctx->client->db,keyname); + value = lookupKeyReadWithFlags(ctx->client->db,keyname, flags); if (value == NULL) { return NULL; } @@ -6397,6 +6398,38 @@ size_t moduleCount(void) { return dictSize(modules); } +/* Set the key LRU/LFU depending on server.maxmemory_policy. + * The lru_idle arg is idle time in seconds, and is only relevant if the + * eviction policy is LRU based. + * The lfu_freq arg is a logarithmic counter that provides an indication of + * the access frequencyonly (must be <= 255) and is only relevant if the + * eviction policy is LFU based. + * Either or both of them may be <0, in that case, nothing is set. */ +/* return value is an indication if the lru field was updated or not. */ +int RM_SetLRUOrLFU(RedisModuleKey *key, long long lfu_freq, long long lru_idle) { + if (!key->value) + return REDISMODULE_ERR; + if (objectSetLRUOrLFU(key->value, lfu_freq, lru_idle, lru_idle>=0 ? LRU_CLOCK() : 0)) + return REDISMODULE_OK; + return REDISMODULE_ERR; +} + +/* Gets the key LRU or LFU (depending on the current eviction policy). + * One will be set to the appropiate return value, and the other will be set to -1. + * see RedisModule_SetLRUOrLFU for units and ranges. + * return value is an indication of success. */ +int RM_GetLRUOrLFU(RedisModuleKey *key, long long *lfu_freq, long long *lru_idle) { + *lru_idle = *lfu_freq = -1; + if (!key->value) + return REDISMODULE_ERR; + if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) { + *lfu_freq = LFUDecrAndReturn(key->value); + } else { + *lru_idle = estimateObjectIdleTime(key->value)/1000; + } + return REDISMODULE_OK; +} + /* Register all the APIs we export. Keep this function at the end of the * file so that's easy to seek it to add new entries. */ void moduleRegisterCoreAPI(void) { @@ -6589,4 +6622,6 @@ void moduleRegisterCoreAPI(void) { REGISTER_API(InfoAddFieldULongLong); REGISTER_API(GetClientInfoById); REGISTER_API(SubscribeToServerEvent); + REGISTER_API(SetLRUOrLFU); + REGISTER_API(GetLRUOrLFU); } diff --git a/src/object.c b/src/object.c index 70022f897..47b21ae1f 100644 --- a/src/object.c +++ b/src/object.c @@ -1209,12 +1209,13 @@ sds getMemoryDoctorReport(void) { * The lru_idle and lru_clock args are only relevant if policy * is MAXMEMORY_FLAG_LRU. * Either or both of them may be <0, in that case, nothing is set. */ -void objectSetLRUOrLFU(robj *val, long long lfu_freq, long long lru_idle, +int objectSetLRUOrLFU(robj *val, long long lfu_freq, long long lru_idle, long long lru_clock) { if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) { if (lfu_freq >= 0) { serverAssert(lfu_freq <= 255); val->lru = (LFUGetTimeInMinutes()<<8) | lfu_freq; + return 1; } } else if (lru_idle >= 0) { /* Provided LRU idle time is in seconds. Scale @@ -1231,7 +1232,9 @@ void objectSetLRUOrLFU(robj *val, long long lfu_freq, long long lru_idle, if (lru_abs < 0) lru_abs = (lru_clock+(LRU_CLOCK_MAX/2)) % LRU_CLOCK_MAX; val->lru = lru_abs; + return 1; } + return 0; } /* ======================= The OBJECT and MEMORY commands =================== */ diff --git a/src/redismodule.h b/src/redismodule.h index 7053840b2..96e9fb4fa 100644 --- a/src/redismodule.h +++ b/src/redismodule.h @@ -18,6 +18,10 @@ #define REDISMODULE_READ (1<<0) #define REDISMODULE_WRITE (1<<1) +/* RedisModule_OpenKey extra flags for the 'mode' argument. + * Avoid touching the LRU/LFU of the key when opened. */ +#define REDISMODULE_OPEN_KEY_NOTOUCH (1<<16) + #define REDISMODULE_LIST_HEAD 0 #define REDISMODULE_LIST_TAIL 1 @@ -503,6 +507,8 @@ int REDISMODULE_API_FUNC(RedisModule_InfoAddFieldDouble)(RedisModuleInfoCtx *ctx int REDISMODULE_API_FUNC(RedisModule_InfoAddFieldLongLong)(RedisModuleInfoCtx *ctx, char *field, long long value); int REDISMODULE_API_FUNC(RedisModule_InfoAddFieldULongLong)(RedisModuleInfoCtx *ctx, char *field, unsigned long long value); int REDISMODULE_API_FUNC(RedisModule_SubscribeToServerEvent)(RedisModuleCtx *ctx, RedisModuleEvent event, RedisModuleEventCallback callback); +int REDISMODULE_API_FUNC(RedisModule_SetLRUOrLFU)(RedisModuleKey *key, long long lfu_freq, long long lru_idle); +int REDISMODULE_API_FUNC(RedisModule_GetLRUOrLFU)(RedisModuleKey *key, long long *lfu_freq, long long *lru_idle); /* Experimental APIs */ #ifdef REDISMODULE_EXPERIMENTAL_API @@ -703,6 +709,8 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int REDISMODULE_GET_API(InfoAddFieldULongLong); REDISMODULE_GET_API(GetClientInfoById); REDISMODULE_GET_API(SubscribeToServerEvent); + REDISMODULE_GET_API(SetLRUOrLFU); + REDISMODULE_GET_API(GetLRUOrLFU); #ifdef REDISMODULE_EXPERIMENTAL_API REDISMODULE_GET_API(GetThreadSafeContext); diff --git a/src/server.h b/src/server.h index 97672d727..30b25a918 100644 --- a/src/server.h +++ b/src/server.h @@ -2079,9 +2079,10 @@ robj *lookupKeyWrite(redisDb *db, robj *key); robj *lookupKeyReadOrReply(client *c, robj *key, robj *reply); robj *lookupKeyWriteOrReply(client *c, robj *key, robj *reply); robj *lookupKeyReadWithFlags(redisDb *db, robj *key, int flags); +robj *lookupKeyWriteWithFlags(redisDb *db, robj *key, int flags); robj *objectCommandLookup(client *c, robj *key); robj *objectCommandLookupOrReply(client *c, robj *key, robj *reply); -void objectSetLRUOrLFU(robj *val, long long lfu_freq, long long lru_idle, +int objectSetLRUOrLFU(robj *val, long long lfu_freq, long long lru_idle, long long lru_clock); #define LOOKUP_NONE 0 #define LOOKUP_NOTOUCH (1<<0) From 51c3ff8d75d9c784297ad587a31a37acd89499d8 Mon Sep 17 00:00:00 2001 From: Oran Agra Date: Tue, 29 Oct 2019 17:59:09 +0200 Subject: [PATCH 2/7] Modules hooks: complete missing hooks for the initial set of hooks * replication hooks: role change, master link status, replica online/offline * persistence hooks: saving, loading, loading progress * misc hooks: cron loop, shutdown, module loaded/unloaded * change the way hooks test work, and add tests for all of the above startLoading() now gets flag indicating what is loaded. stopLoading() now gets an indication of success or failure. adding startSaving() and stopSaving() with similar args and role. --- src/aof.c | 14 +- src/debug.c | 2 +- src/module.c | 161 +++++++++++++++++---- src/networking.c | 5 + src/rdb.c | 80 ++++++++--- src/rdb.h | 11 +- src/redis-check-rdb.c | 6 +- src/redismodule.h | 86 +++++++++-- src/replication.c | 55 ++++++- src/server.c | 11 +- src/server.h | 10 +- tests/modules/hooks.c | 256 +++++++++++++++++++++++++++++++-- tests/unit/moduleapi/hooks.tcl | 134 +++++++++++++++-- 13 files changed, 737 insertions(+), 94 deletions(-) diff --git a/src/aof.c b/src/aof.c index 0e3648ff0..dda9579e3 100644 --- a/src/aof.c +++ b/src/aof.c @@ -731,7 +731,7 @@ int loadAppendOnlyFile(char *filename) { server.aof_state = AOF_OFF; fakeClient = createAOFClient(); - startLoadingFile(fp, filename); + startLoadingFile(fp, filename, RDBFLAGS_AOF_PREAMBLE); /* Check if this AOF file has an RDB preamble. In that case we need to * load the RDB file and later continue loading the AOF tail. */ @@ -746,7 +746,7 @@ int loadAppendOnlyFile(char *filename) { serverLog(LL_NOTICE,"Reading RDB preamble from AOF file..."); if (fseek(fp,0,SEEK_SET) == -1) goto readerr; rioInitWithFile(&rdb,fp); - if (rdbLoadRio(&rdb,NULL,1) != C_OK) { + if (rdbLoadRio(&rdb,RDBFLAGS_AOF_PREAMBLE,NULL) != C_OK) { serverLog(LL_WARNING,"Error reading the RDB preamble of the AOF file, AOF loading aborted"); goto readerr; } else { @@ -767,6 +767,7 @@ int loadAppendOnlyFile(char *filename) { if (!(loops++ % 1000)) { loadingProgress(ftello(fp)); processEventsWhileBlocked(); + processModuleLoadingProgressEvent(1); } if (fgets(buf,sizeof(buf),fp) == NULL) { @@ -859,7 +860,7 @@ loaded_ok: /* DB loaded, cleanup and return C_OK to the caller. */ fclose(fp); freeFakeClient(fakeClient); server.aof_state = old_aof_state; - stopLoading(); + stopLoading(1); aofUpdateCurrentSize(); server.aof_rewrite_base_size = server.aof_current_size; server.aof_fsync_offset = server.aof_current_size; @@ -1400,9 +1401,11 @@ int rewriteAppendOnlyFile(char *filename) { if (server.aof_rewrite_incremental_fsync) rioSetAutoSync(&aof,REDIS_AUTOSYNC_BYTES); + startSaving(RDBFLAGS_AOF_PREAMBLE); + if (server.aof_use_rdb_preamble) { int error; - if (rdbSaveRio(&aof,&error,RDB_SAVE_AOF_PREAMBLE,NULL) == C_ERR) { + if (rdbSaveRio(&aof,&error,RDBFLAGS_AOF_PREAMBLE,NULL) == C_ERR) { errno = error; goto werr; } @@ -1465,15 +1468,18 @@ int rewriteAppendOnlyFile(char *filename) { if (rename(tmpfile,filename) == -1) { serverLog(LL_WARNING,"Error moving temp append only file on the final destination: %s", strerror(errno)); unlink(tmpfile); + stopSaving(0); return C_ERR; } serverLog(LL_NOTICE,"SYNC append only file rewrite performed"); + stopSaving(1); return C_OK; werr: serverLog(LL_WARNING,"Write error writing append only file on disk: %s", strerror(errno)); fclose(fp); unlink(tmpfile); + stopSaving(0); return C_ERR; } diff --git a/src/debug.c b/src/debug.c index 179f6d2c9..a2d37337d 100644 --- a/src/debug.c +++ b/src/debug.c @@ -417,7 +417,7 @@ NULL } emptyDb(-1,EMPTYDB_NO_FLAGS,NULL); protectClient(c); - int ret = rdbLoad(server.rdb_filename,NULL); + int ret = rdbLoad(server.rdb_filename,NULL,RDBFLAGS_NONE); unprotectClient(c); if (ret != C_OK) { addReplyError(c,"Error trying to load the RDB dump"); diff --git a/src/module.c b/src/module.c index 2a1bda879..57e388881 100644 --- a/src/module.c +++ b/src/module.c @@ -1620,6 +1620,27 @@ int modulePopulateClientInfoStructure(void *ci, client *client, int structver) { return REDISMODULE_OK; } +/* This is an helper for moduleFireServerEvent() and other functions: + * It populates the replication info structure with the appropriate + * fields depending on the version provided. If the version is not valid + * then REDISMODULE_ERR is returned. Otherwise the function returns + * REDISMODULE_OK and the structure pointed by 'ri' gets populated. */ +int modulePopulateReplicationInfoStructure(void *ri, int structver) { + if (structver != 1) return REDISMODULE_ERR; + + RedisModuleReplicationInfoV1 *ri1 = ri; + memset(ri1,0,sizeof(*ri1)); + ri1->version = structver; + ri1->master = server.masterhost==NULL; + ri1->masterhost = server.masterhost? server.masterhost: ""; + ri1->masterport = server.masterport; + ri1->replid1 = server.replid; + ri1->replid2 = server.replid2; + ri1->repl1_offset = server.master_repl_offset; + ri1->repl2_offset = server.second_replid_offset; + return REDISMODULE_OK; +} + /* Return information about the client with the specified ID (that was * previously obtained via the RedisModule_GetClientId() API). If the * client exists, REDISMODULE_OK is returned, otherwise REDISMODULE_ERR @@ -5780,8 +5801,8 @@ void ModuleForkDoneHandler(int exitcode, int bysignal) { * * The following sub events are available: * - * REDISMODULE_EVENT_REPLROLECHANGED_NOW_MASTER - * REDISMODULE_EVENT_REPLROLECHANGED_NOW_REPLICA + * REDISMODULE_SUBEVENT_REPLROLECHANGED_NOW_MASTER + * REDISMODULE_SUBEVENT_REPLROLECHANGED_NOW_REPLICA * * The 'data' field can be casted by the callback to a * RedisModuleReplicationInfo structure with the following fields: @@ -5791,24 +5812,30 @@ void ModuleForkDoneHandler(int exitcode, int bysignal) { * int masterport; // master instance port for NOW_REPLICA * char *replid1; // Main replication ID * char *replid2; // Secondary replication ID + * uint64_t repl1_offset; // Main replication offset * uint64_t repl2_offset; // Offset of replid2 validity - * uint64_t main_repl_offset; // Replication offset * * RedisModuleEvent_Persistence * * This event is called when RDB saving or AOF rewriting starts * and ends. The following sub events are available: * - * REDISMODULE_EVENT_LOADING_RDB_START // BGSAVE start - * REDISMODULE_EVENT_LOADING_RDB_END // BGSAVE end - * REDISMODULE_EVENT_LOADING_SYNC_RDB_START // SAVE start - * REDISMODULE_EVENT_LOADING_SYNC_RDB_START // SAVE end - * REDISMODULE_EVENT_LOADING_AOF_START // AOF rewrite start - * REDISMODULE_EVENT_LOADING_AOF_END // AOF rewrite end + * REDISMODULE_SUBEVENT_PERSISTENCE_RDB_START + * REDISMODULE_SUBEVENT_PERSISTENCE_AOF_START + * REDISMODULE_SUBEVENT_PERSISTENCE_SYNC_RDB_START + * REDISMODULE_SUBEVENT_PERSISTENCE_ENDED + * REDISMODULE_SUBEVENT_PERSISTENCE_FAILED * * The above events are triggered not just when the user calls the * relevant commands like BGSAVE, but also when a saving operation * or AOF rewriting occurs because of internal server triggers. + * The SYNC_RDB_START sub events are happening in the forground due to + * SAVE command, FLUSHALL, or server shutdown, and the other RDB and + * AOF sub events are executed in a background fork child, so any + * action the module takes can only affect the generated AOF or RDB, + * but will not be reflected in the parent process and affect connected + * clients and commands. Also note that the AOF_START sub event may end + * up saving RDB content in case of an AOF with rdb-preamble. * * RedisModuleEvent_FlushDB * @@ -5816,8 +5843,8 @@ void ModuleForkDoneHandler(int exitcode, int bysignal) { * because of replication, after the replica synchronization) * happened. The following sub events are available: * - * REDISMODULE_EVENT_FLUSHDB_START - * REDISMODULE_EVENT_FLUSHDB_END + * REDISMODULE_SUBEVENT_FLUSHDB_START + * REDISMODULE_SUBEVENT_FLUSHDB_END * * The data pointer can be casted to a RedisModuleFlushInfo * structure with the following fields: @@ -5841,12 +5868,15 @@ void ModuleForkDoneHandler(int exitcode, int bysignal) { * replica is loading the RDB file from the master. * The following sub events are available: * - * REDISMODULE_EVENT_LOADING_RDB_START - * REDISMODULE_EVENT_LOADING_RDB_END - * REDISMODULE_EVENT_LOADING_MASTER_RDB_START - * REDISMODULE_EVENT_LOADING_MASTER_RDB_END - * REDISMODULE_EVENT_LOADING_AOF_START - * REDISMODULE_EVENT_LOADING_AOF_END + * REDISMODULE_SUBEVENT_LOADING_RDB_START + * REDISMODULE_SUBEVENT_LOADING_AOF_START + * REDISMODULE_SUBEVENT_LOADING_REPL_START + * REDISMODULE_SUBEVENT_LOADING_ENDED + * REDISMODULE_SUBEVENT_LOADING_FAILED + * + * Note that AOF loading may start with an RDB data in case of + * rdb-preamble, in which case you'll only recieve an AOF_START event. + * * * RedisModuleEvent_ClientChange * @@ -5855,8 +5885,8 @@ void ModuleForkDoneHandler(int exitcode, int bysignal) { * structure, documented in RedisModule_GetClientInfoById(). * The following sub events are available: * - * REDISMODULE_EVENT_CLIENT_CHANGE_CONNECTED - * REDISMODULE_EVENT_CLIENT_CHANGE_DISCONNECTED + * REDISMODULE_SUBEVENT_CLIENT_CHANGE_CONNECTED + * REDISMODULE_SUBEVENT_CLIENT_CHANGE_DISCONNECTED * * RedisModuleEvent_Shutdown * @@ -5869,8 +5899,8 @@ void ModuleForkDoneHandler(int exitcode, int bysignal) { * replica since it gets disconnected. * The following sub events are availble: * - * REDISMODULE_EVENT_REPLICA_CHANGE_ONLINE - * REDISMODULE_EVENT_REPLICA_CHANGE_OFFLINE + * REDISMODULE_SUBEVENT_REPLICA_CHANGE_ONLINE + * REDISMODULE_SUBEVENT_REPLICA_CHANGE_OFFLINE * * No additional information is available so far: future versions * of Redis will have an API in order to enumerate the replicas @@ -5885,6 +5915,11 @@ void ModuleForkDoneHandler(int exitcode, int bysignal) { * this changes depending on the "hz" configuration. * No sub events are available. * + * The data pointer can be casted to a RedisModuleCronLoop + * structure with the following fields: + * + * int32_t hz; // Approximate number of events per second. + * * RedisModuleEvent_MasterLinkChange * * This is called for replicas in order to notify when the @@ -5894,8 +5929,38 @@ void ModuleForkDoneHandler(int exitcode, int bysignal) { * replication is happening correctly. * The following sub events are available: * - * REDISMODULE_EVENT_MASTER_LINK_UP - * REDISMODULE_EVENT_MASTER_LINK_DOWN + * REDISMODULE_SUBEVENT_MASTER_LINK_UP + * REDISMODULE_SUBEVENT_MASTER_LINK_DOWN + * + * RedisModuleEvent_ModuleChange + * + * This event is called when a new module is loaded or one is unloaded. + * The following sub events are availble: + * + * REDISMODULE_SUBEVENT_MODULE_LOADED + * REDISMODULE_SUBEVENT_MODULE_UNLOADED + * + * The data pointer can be casted to a RedisModuleModuleChange + * structure with the following fields: + * + * const char* module_name; // Name of module loaded or unloaded. + * int32_t module_version; // Module version. + * + * RedisModuleEvent_LoadingProgress + * + * This event is called repeatedly called while an RDB or AOF file + * is being loaded. + * The following sub events are availble: + * + * REDISMODULE_SUBEVENT_LOADING_PROGRESS_RDB + * REDISMODULE_SUBEVENT_LOADING_PROGRESS_AOF + * + * The data pointer can be casted to a RedisModuleLoadingProgress + * structure with the following fields: + * + * int32_t hz; // Approximate number of events per second. + * int32_t progress; // Approximate progress between 0 and 1024, + * or -1 if unknown. * * The function returns REDISMODULE_OK if the module was successfully subscrived * for the specified event. If the API is called from a wrong context then @@ -5954,7 +6019,7 @@ void moduleFireServerEvent(uint64_t eid, int subid, void *data) { listRewind(RedisModule_EventListeners,&li); while((ln = listNext(&li))) { RedisModuleEventListener *el = ln->value; - if (el->event.id == eid && !el->module->in_hook) { + if (el->event.id == eid) { RedisModuleCtx ctx = REDISMODULE_CTX_INIT; ctx.module = el->module; @@ -5968,6 +6033,8 @@ void moduleFireServerEvent(uint64_t eid, int subid, void *data) { void *moduledata = NULL; RedisModuleClientInfoV1 civ1; + RedisModuleReplicationInfoV1 riv1; + RedisModuleModuleChangeV1 mcv1; /* Start at DB zero by default when calling the handler. It's * up to the specific event setup to change it when it makes * sense. For instance for FLUSHDB events we select the correct @@ -5979,11 +6046,26 @@ void moduleFireServerEvent(uint64_t eid, int subid, void *data) { modulePopulateClientInfoStructure(&civ1,data, el->event.dataver); moduledata = &civ1; + } else if (eid == REDISMODULE_EVENT_REPLICATION_ROLE_CHANGED) { + modulePopulateReplicationInfoStructure(&riv1,el->event.dataver); + moduledata = &riv1; } else if (eid == REDISMODULE_EVENT_FLUSHDB) { moduledata = data; RedisModuleFlushInfoV1 *fi = data; if (fi->dbnum != -1) selectDb(ctx.client, fi->dbnum); + } else if (eid == REDISMODULE_EVENT_MODULE_CHANGE) { + RedisModule *m = data; + if (m == el->module) + continue; + mcv1.version = REDISMODULE_MODULE_CHANGE_VERSION; + mcv1.module_name = m->name; + mcv1.module_version = m->ver; + moduledata = &mcv1; + } else if (eid == REDISMODULE_EVENT_LOADING_PROGRESS) { + moduledata = data; + } else if (eid == REDISMODULE_EVENT_CRON_LOOP) { + moduledata = data; } ModulesInHooks++; @@ -6015,6 +6097,27 @@ void moduleUnsubscribeAllServerEvents(RedisModule *module) { } } +void processModuleLoadingProgressEvent(int is_aof) { + long long now = ustime(); + static long long next_event = 0; + if (now >= next_event) { + /* Fire the loading progress modules end event. */ + int progress = -1; + if (server.loading_total_bytes) + progress = (server.loading_total_bytes<<10) / server.loading_total_bytes; + RedisModuleFlushInfoV1 fi = {REDISMODULE_LOADING_PROGRESS_VERSION, + server.hz, + progress}; + moduleFireServerEvent(REDISMODULE_EVENT_LOADING_PROGRESS, + is_aof? + REDISMODULE_SUBEVENT_LOADING_PROGRESS_AOF: + REDISMODULE_SUBEVENT_LOADING_PROGRESS_RDB, + &fi); + /* decide when the next event should fire. */ + next_event = now + 1000000 / server.hz; + } +} + /* -------------------------------------------------------------------------- * Modules API internals * -------------------------------------------------------------------------- */ @@ -6183,6 +6286,11 @@ int moduleLoad(const char *path, void **module_argv, int module_argc) { ctx.module->blocked_clients = 0; ctx.module->handle = handle; serverLog(LL_NOTICE,"Module '%s' loaded from %s",ctx.module->name,path); + /* Fire the loaded modules event. */ + moduleFireServerEvent(REDISMODULE_EVENT_MODULE_CHANGE, + REDISMODULE_SUBEVENT_MODULE_LOADED, + ctx.module); + moduleFreeContext(&ctx); return C_OK; } @@ -6245,6 +6353,11 @@ int moduleUnload(sds name) { module->name, error); } + /* Fire the unloaded modules event. */ + moduleFireServerEvent(REDISMODULE_EVENT_MODULE_CHANGE, + REDISMODULE_SUBEVENT_MODULE_UNLOADED, + module); + /* Remove from list of modules. */ serverLog(LL_NOTICE,"Module %s unloaded",module->name); dictDelete(modules,module->name); diff --git a/src/networking.c b/src/networking.c index e7cc561fa..9336c177c 100644 --- a/src/networking.c +++ b/src/networking.c @@ -1118,6 +1118,11 @@ void freeClient(client *c) { if (c->flags & CLIENT_SLAVE && listLength(server.slaves) == 0) server.repl_no_slaves_since = server.unixtime; refreshGoodSlavesCount(); + /* Fire the replica change modules event. */ + if (c->replstate == SLAVE_STATE_ONLINE) + moduleFireServerEvent(REDISMODULE_EVENT_REPLICA_CHANGE, + REDISMODULE_SUBEVENT_REPLICA_CHANGE_OFFLINE, + NULL); } /* Master/slave cleanup Case 2: diff --git a/src/rdb.c b/src/rdb.c index f530219a4..b569edfea 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -1080,9 +1080,9 @@ ssize_t rdbSaveAuxFieldStrInt(rio *rdb, char *key, long long val) { } /* Save a few default AUX fields with information about the RDB generated. */ -int rdbSaveInfoAuxFields(rio *rdb, int flags, rdbSaveInfo *rsi) { +int rdbSaveInfoAuxFields(rio *rdb, int rdbflags, rdbSaveInfo *rsi) { int redis_bits = (sizeof(void*) == 8) ? 64 : 32; - int aof_preamble = (flags & RDB_SAVE_AOF_PREAMBLE) != 0; + int aof_preamble = (rdbflags & RDBFLAGS_AOF_PREAMBLE) != 0; /* Add a few fields about the state when the RDB was created. */ if (rdbSaveAuxFieldStrStr(rdb,"redis-ver",REDIS_VERSION) == -1) return -1; @@ -1150,7 +1150,7 @@ ssize_t rdbSaveSingleModuleAux(rio *rdb, int when, moduleType *mt) { * When the function returns C_ERR and if 'error' is not NULL, the * integer pointed by 'error' is set to the value of errno just after the I/O * error. */ -int rdbSaveRio(rio *rdb, int *error, int flags, rdbSaveInfo *rsi) { +int rdbSaveRio(rio *rdb, int *error, int rdbflags, rdbSaveInfo *rsi) { dictIterator *di = NULL; dictEntry *de; char magic[10]; @@ -1162,7 +1162,7 @@ int rdbSaveRio(rio *rdb, int *error, int flags, rdbSaveInfo *rsi) { rdb->update_cksum = rioGenericUpdateChecksum; snprintf(magic,sizeof(magic),"REDIS%04d",RDB_VERSION); if (rdbWriteRaw(rdb,magic,9) == -1) goto werr; - if (rdbSaveInfoAuxFields(rdb,flags,rsi) == -1) goto werr; + if (rdbSaveInfoAuxFields(rdb,rdbflags,rsi) == -1) goto werr; if (rdbSaveModulesAux(rdb, REDISMODULE_AUX_BEFORE_RDB) == -1) goto werr; for (j = 0; j < server.dbnum; j++) { @@ -1199,7 +1199,7 @@ int rdbSaveRio(rio *rdb, int *error, int flags, rdbSaveInfo *rsi) { /* When this RDB is produced as part of an AOF rewrite, move * accumulated diff from parent to child while rewriting in * order to have a smaller final write. */ - if (flags & RDB_SAVE_AOF_PREAMBLE && + if (rdbflags & RDBFLAGS_AOF_PREAMBLE && rdb->processed_bytes > processed+AOF_READ_DIFF_INTERVAL_BYTES) { processed = rdb->processed_bytes; @@ -1254,18 +1254,21 @@ werr: int rdbSaveRioWithEOFMark(rio *rdb, int *error, rdbSaveInfo *rsi) { char eofmark[RDB_EOF_MARK_SIZE]; + startSaving(RDBFLAGS_REPLICATION); getRandomHexChars(eofmark,RDB_EOF_MARK_SIZE); if (error) *error = 0; if (rioWrite(rdb,"$EOF:",5) == 0) goto werr; if (rioWrite(rdb,eofmark,RDB_EOF_MARK_SIZE) == 0) goto werr; if (rioWrite(rdb,"\r\n",2) == 0) goto werr; - if (rdbSaveRio(rdb,error,RDB_SAVE_NONE,rsi) == C_ERR) goto werr; + if (rdbSaveRio(rdb,error,RDBFLAGS_NONE,rsi) == C_ERR) goto werr; if (rioWrite(rdb,eofmark,RDB_EOF_MARK_SIZE) == 0) goto werr; + stopSaving(1); return C_OK; werr: /* Write error. */ /* Set 'error' only if not already set by rdbSaveRio() call. */ if (error && *error == 0) *error = errno; + stopSaving(0); return C_ERR; } @@ -1291,11 +1294,12 @@ int rdbSave(char *filename, rdbSaveInfo *rsi) { } rioInitWithFile(&rdb,fp); + startSaving(RDBFLAGS_NONE); if (server.rdb_save_incremental_fsync) rioSetAutoSync(&rdb,REDIS_AUTOSYNC_BYTES); - if (rdbSaveRio(&rdb,&error,RDB_SAVE_NONE,rsi) == C_ERR) { + if (rdbSaveRio(&rdb,&error,RDBFLAGS_NONE,rsi) == C_ERR) { errno = error; goto werr; } @@ -1317,6 +1321,7 @@ int rdbSave(char *filename, rdbSaveInfo *rsi) { cwdp ? cwdp : "unknown", strerror(errno)); unlink(tmpfile); + stopSaving(0); return C_ERR; } @@ -1324,12 +1329,14 @@ int rdbSave(char *filename, rdbSaveInfo *rsi) { server.dirty = 0; server.lastsave = time(NULL); server.lastbgsave_status = C_OK; + stopSaving(1); return C_OK; werr: serverLog(LL_WARNING,"Write error saving DB on disk: %s", strerror(errno)); fclose(fp); unlink(tmpfile); + stopSaving(0); return C_ERR; } @@ -1918,23 +1925,33 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key) { /* Mark that we are loading in the global state and setup the fields * needed to provide loading stats. */ -void startLoading(size_t size) { +void startLoading(size_t size, int rdbflags) { /* Load the DB */ server.loading = 1; server.loading_start_time = time(NULL); server.loading_loaded_bytes = 0; server.loading_total_bytes = size; + + /* Fire the loading modules start event. */ + int subevent; + if (rdbflags & RDBFLAGS_AOF_PREAMBLE) + subevent = REDISMODULE_SUBEVENT_LOADING_AOF_START; + else if(rdbflags & RDBFLAGS_REPLICATION) + subevent = REDISMODULE_SUBEVENT_LOADING_REPL_START; + else + subevent = REDISMODULE_SUBEVENT_LOADING_RDB_START; + moduleFireServerEvent(REDISMODULE_EVENT_LOADING,subevent,NULL); } /* Mark that we are loading in the global state and setup the fields * needed to provide loading stats. * 'filename' is optional and used for rdb-check on error */ -void startLoadingFile(FILE *fp, char* filename) { +void startLoadingFile(FILE *fp, char* filename, int rdbflags) { struct stat sb; if (fstat(fileno(fp), &sb) == -1) sb.st_size = 0; rdbFileBeingLoaded = filename; - startLoading(sb.st_size); + startLoading(sb.st_size, rdbflags); } /* Refresh the loading progress info */ @@ -1945,9 +1962,37 @@ void loadingProgress(off_t pos) { } /* Loading finished */ -void stopLoading(void) { +void stopLoading(int success) { server.loading = 0; rdbFileBeingLoaded = NULL; + + /* Fire the loading modules end event. */ + moduleFireServerEvent(REDISMODULE_EVENT_LOADING, + success? + REDISMODULE_SUBEVENT_LOADING_ENDED: + REDISMODULE_SUBEVENT_LOADING_FAILED, + NULL); +} + +void startSaving(int rdbflags) { + /* Fire the persistence modules end event. */ + int subevent; + if (rdbflags & RDBFLAGS_AOF_PREAMBLE) + subevent = REDISMODULE_SUBEVENT_PERSISTENCE_AOF_START; + else if (getpid()!=server.pid) + subevent = REDISMODULE_SUBEVENT_PERSISTENCE_RDB_START; + else + subevent = REDISMODULE_SUBEVENT_PERSISTENCE_SYNC_RDB_START; + moduleFireServerEvent(REDISMODULE_EVENT_PERSISTENCE,subevent,NULL); +} + +void stopSaving(int success) { + /* Fire the persistence modules end event. */ + moduleFireServerEvent(REDISMODULE_EVENT_PERSISTENCE, + success? + REDISMODULE_SUBEVENT_PERSISTENCE_ENDED: + REDISMODULE_SUBEVENT_PERSISTENCE_FAILED, + NULL); } /* Track loading progress in order to serve client's from time to time @@ -1966,12 +2011,13 @@ void rdbLoadProgressCallback(rio *r, const void *buf, size_t len) { replicationSendNewlineToMaster(); loadingProgress(r->processed_bytes); processEventsWhileBlocked(); + processModuleLoadingProgressEvent(0); } } /* Load an RDB file from the rio stream 'rdb'. On success C_OK is returned, * otherwise C_ERR is returned and 'errno' is set accordingly. */ -int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) { +int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) { uint64_t dbid; int type, rdbver; redisDb *db = server.db+0; @@ -2182,7 +2228,7 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) { * received from the master. In the latter case, the master is * responsible for key expiry. If we would expire keys here, the * snapshot taken by the master may not be reflected on the slave. */ - if (server.masterhost == NULL && !loading_aof && expiretime != -1 && expiretime < now) { + if (server.masterhost == NULL && !(rdbflags&RDBFLAGS_AOF_PREAMBLE) && expiretime != -1 && expiretime < now) { decrRefCount(key); decrRefCount(val); } else { @@ -2243,17 +2289,17 @@ eoferr: * * If you pass an 'rsi' structure initialied with RDB_SAVE_OPTION_INIT, the * loading code will fiil the information fields in the structure. */ -int rdbLoad(char *filename, rdbSaveInfo *rsi) { +int rdbLoad(char *filename, rdbSaveInfo *rsi, int rdbflags) { FILE *fp; rio rdb; int retval; if ((fp = fopen(filename,"r")) == NULL) return C_ERR; - startLoadingFile(fp, filename); + startLoadingFile(fp, filename,rdbflags); rioInitWithFile(&rdb,fp); - retval = rdbLoadRio(&rdb,rsi,0); + retval = rdbLoadRio(&rdb,rdbflags,rsi); fclose(fp); - stopLoading(); + stopLoading(retval==C_OK); return retval; } diff --git a/src/rdb.h b/src/rdb.h index 40a50f7ba..4229beea8 100644 --- a/src/rdb.h +++ b/src/rdb.h @@ -121,8 +121,10 @@ #define RDB_LOAD_PLAIN (1<<1) #define RDB_LOAD_SDS (1<<2) -#define RDB_SAVE_NONE 0 -#define RDB_SAVE_AOF_PREAMBLE (1<<0) +/* flags on the purpose of rdb save or load */ +#define RDBFLAGS_NONE 0 +#define RDBFLAGS_AOF_PREAMBLE (1<<0) +#define RDBFLAGS_REPLICATION (1<<1) int rdbSaveType(rio *rdb, unsigned char type); int rdbLoadType(rio *rdb); @@ -135,7 +137,7 @@ uint64_t rdbLoadLen(rio *rdb, int *isencoded); int rdbLoadLenByRef(rio *rdb, int *isencoded, uint64_t *lenptr); int rdbSaveObjectType(rio *rdb, robj *o); int rdbLoadObjectType(rio *rdb); -int rdbLoad(char *filename, rdbSaveInfo *rsi); +int rdbLoad(char *filename, rdbSaveInfo *rsi, int rdbflags); int rdbSaveBackground(char *filename, rdbSaveInfo *rsi); int rdbSaveToSlavesSockets(rdbSaveInfo *rsi); void rdbRemoveTempFile(pid_t childpid); @@ -154,7 +156,8 @@ int rdbSaveBinaryDoubleValue(rio *rdb, double val); int rdbLoadBinaryDoubleValue(rio *rdb, double *val); int rdbSaveBinaryFloatValue(rio *rdb, float val); int rdbLoadBinaryFloatValue(rio *rdb, float *val); -int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof); +int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi); +int rdbSaveRio(rio *rdb, int *error, int rdbflags, rdbSaveInfo *rsi); rdbSaveInfo *rdbPopulateSaveInfo(rdbSaveInfo *rsi); #endif diff --git a/src/redis-check-rdb.c b/src/redis-check-rdb.c index 5e7415046..1210d49b4 100644 --- a/src/redis-check-rdb.c +++ b/src/redis-check-rdb.c @@ -202,7 +202,7 @@ int redis_check_rdb(char *rdbfilename, FILE *fp) { } expiretime = -1; - startLoadingFile(fp, rdbfilename); + startLoadingFile(fp, rdbfilename, RDBFLAGS_NONE); while(1) { robj *key, *val; @@ -316,7 +316,7 @@ int redis_check_rdb(char *rdbfilename, FILE *fp) { } if (closefile) fclose(fp); - stopLoading(); + stopLoading(1); return 0; eoferr: /* unexpected end of file is handled here with a fatal exit */ @@ -327,7 +327,7 @@ eoferr: /* unexpected end of file is handled here with a fatal exit */ } err: if (closefile) fclose(fp); - stopLoading(); + stopLoading(0); return 1; } diff --git a/src/redismodule.h b/src/redismodule.h index 7053840b2..7ae1ad692 100644 --- a/src/redismodule.h +++ b/src/redismodule.h @@ -181,6 +181,8 @@ typedef uint64_t RedisModuleTimerID; #define REDISMODULE_EVENT_REPLICA_CHANGE 6 #define REDISMODULE_EVENT_MASTER_LINK_CHANGE 7 #define REDISMODULE_EVENT_CRON_LOOP 8 +#define REDISMODULE_EVENT_MODULE_CHANGE 9 +#define REDISMODULE_EVENT_LOADING_PROGRESS 10 typedef struct RedisModuleEvent { uint64_t id; /* REDISMODULE_EVENT_... defines. */ @@ -226,18 +228,28 @@ static const RedisModuleEvent RedisModuleEvent_MasterLinkChange = { REDISMODULE_EVENT_MASTER_LINK_CHANGE, 1 + }, + RedisModuleEvent_ModuleChange = { + REDISMODULE_EVENT_MODULE_CHANGE, + 1 + }, + RedisModuleEvent_LoadingProgress = { + REDISMODULE_EVENT_LOADING_PROGRESS, + 1 }; /* Those are values that are used for the 'subevent' callback argument. */ #define REDISMODULE_SUBEVENT_PERSISTENCE_RDB_START 0 -#define REDISMODULE_SUBEVENT_PERSISTENCE_RDB_END 1 -#define REDISMODULE_SUBEVENT_PERSISTENCE_AOF_START 2 -#define REDISMODULE_SUBEVENT_PERSISTENCE_AOF_END 3 +#define REDISMODULE_SUBEVENT_PERSISTENCE_AOF_START 1 +#define REDISMODULE_SUBEVENT_PERSISTENCE_SYNC_RDB_START 2 +#define REDISMODULE_SUBEVENT_PERSISTENCE_ENDED 3 +#define REDISMODULE_SUBEVENT_PERSISTENCE_FAILED 4 #define REDISMODULE_SUBEVENT_LOADING_RDB_START 0 -#define REDISMODULE_SUBEVENT_LOADING_RDB_END 1 -#define REDISMODULE_SUBEVENT_LOADING_AOF_START 2 -#define REDISMODULE_SUBEVENT_LOADING_AOF_END 3 +#define REDISMODULE_SUBEVENT_LOADING_AOF_START 1 +#define REDISMODULE_SUBEVENT_LOADING_REPL_START 2 +#define REDISMODULE_SUBEVENT_LOADING_ENDED 3 +#define REDISMODULE_SUBEVENT_LOADING_FAILED 4 #define REDISMODULE_SUBEVENT_CLIENT_CHANGE_CONNECTED 0 #define REDISMODULE_SUBEVENT_CLIENT_CHANGE_DISCONNECTED 1 @@ -245,12 +257,21 @@ static const RedisModuleEvent #define REDISMODULE_SUBEVENT_MASTER_LINK_UP 0 #define REDISMODULE_SUBEVENT_MASTER_LINK_DOWN 1 -#define REDISMODULE_SUBEVENT_REPLICA_CHANGE_CONNECTED 0 -#define REDISMODULE_SUBEVENT_REPLICA_CHANGE_DISCONNECTED 1 +#define REDISMODULE_SUBEVENT_REPLICA_CHANGE_ONLINE 0 +#define REDISMODULE_SUBEVENT_REPLICA_CHANGE_OFFLINE 1 + +#define REDISMODULE_EVENT_REPLROLECHANGED_NOW_MASTER 0 +#define REDISMODULE_EVENT_REPLROLECHANGED_NOW_REPLICA 1 #define REDISMODULE_SUBEVENT_FLUSHDB_START 0 #define REDISMODULE_SUBEVENT_FLUSHDB_END 1 +#define REDISMODULE_SUBEVENT_MODULE_LOADED 0 +#define REDISMODULE_SUBEVENT_MODULE_UNLOADED 1 + +#define REDISMODULE_SUBEVENT_LOADING_PROGRESS_RDB 0 +#define REDISMODULE_SUBEVENT_LOADING_PROGRESS_AOF 1 + /* RedisModuleClientInfo flags. */ #define REDISMODULE_CLIENTINFO_FLAG_SSL (1<<0) #define REDISMODULE_CLIENTINFO_FLAG_PUBSUB (1<<1) @@ -286,6 +307,22 @@ typedef struct RedisModuleClientInfo { #define RedisModuleClientInfo RedisModuleClientInfoV1 +#define REDISMODULE_REPLICATIONINFO_VERSION 1 +typedef struct RedisModuleReplicationInfo { + uint64_t version; /* Not used since this structure is never passed + from the module to the core right now. Here + for future compatibility. */ + int master; /* true if master, false if replica */ + char *masterhost; /* master instance hostname for NOW_REPLICA */ + int masterport; /* master instance port for NOW_REPLICA */ + char *replid1; /* Main replication ID */ + char *replid2; /* Secondary replication ID */ + uint64_t repl1_offset; /* Main replication offset */ + uint64_t repl2_offset; /* Offset of replid2 validity */ +} RedisModuleReplicationInfoV1; + +#define RedisModuleReplicationInfo RedisModuleReplicationInfoV1 + #define REDISMODULE_FLUSHINFO_VERSION 1 typedef struct RedisModuleFlushInfo { uint64_t version; /* Not used since this structure is never passed @@ -297,6 +334,39 @@ typedef struct RedisModuleFlushInfo { #define RedisModuleFlushInfo RedisModuleFlushInfoV1 +#define REDISMODULE_MODULE_CHANGE_VERSION 1 +typedef struct RedisModuleModuleChange { + uint64_t version; /* Not used since this structure is never passed + from the module to the core right now. Here + for future compatibility. */ + const char* module_name;/* Name of module loaded or unloaded. */ + int32_t module_version; /* Module version. */ +} RedisModuleModuleChangeV1; + +#define RedisModuleModuleChange RedisModuleModuleChangeV1 + +#define REDISMODULE_CRON_LOOP_VERSION 1 +typedef struct RedisModuleCronLoopInfo { + uint64_t version; /* Not used since this structure is never passed + from the module to the core right now. Here + for future compatibility. */ + int32_t hz; /* Approximate number of events per second. */ +} RedisModuleCronLoopV1; + +#define RedisModuleCronLoop RedisModuleCronLoopV1 + +#define REDISMODULE_LOADING_PROGRESS_VERSION 1 +typedef struct RedisModuleLoadingProgressInfo { + uint64_t version; /* Not used since this structure is never passed + from the module to the core right now. Here + for future compatibility. */ + int32_t hz; /* Approximate number of events per second. */ + int32_t progress; /* Approximate progress between 0 and 1024, or -1 + * if unknown. */ +} RedisModuleLoadingProgressV1; + +#define RedisModuleLoadingProgress RedisModuleLoadingProgressV1 + /* ------------------------- End of common defines ------------------------ */ #ifndef REDISMODULE_CORE diff --git a/src/replication.c b/src/replication.c index 4550e6a83..c9a2e0fe1 100644 --- a/src/replication.c +++ b/src/replication.c @@ -533,6 +533,12 @@ int masterTryPartialResynchronization(client *c) { * has this state from the previous connection with the master. */ refreshGoodSlavesCount(); + + /* Fire the replica change modules event. */ + moduleFireServerEvent(REDISMODULE_EVENT_REPLICA_CHANGE, + REDISMODULE_SUBEVENT_REPLICA_CHANGE_ONLINE, + NULL); + return C_OK; /* The caller can return, no full resync needed. */ need_full_resync: @@ -868,6 +874,10 @@ void putSlaveOnline(client *slave) { return; } refreshGoodSlavesCount(); + /* Fire the replica change modules event. */ + moduleFireServerEvent(REDISMODULE_EVENT_REPLICA_CHANGE, + REDISMODULE_SUBEVENT_REPLICA_CHANGE_ONLINE, + NULL); serverLog(LL_NOTICE,"Synchronization with replica %s succeeded", replicationGetSlaveName(slave)); } @@ -1542,11 +1552,11 @@ void readSyncBulkPayload(connection *conn) { * We'll restore it when the RDB is received. */ connBlock(conn); connRecvTimeout(conn, server.repl_timeout*1000); - startLoading(server.repl_transfer_size); + startLoading(server.repl_transfer_size, RDBFLAGS_REPLICATION); - if (rdbLoadRio(&rdb,&rsi,0) != C_OK) { + if (rdbLoadRio(&rdb,RDBFLAGS_REPLICATION,&rsi) != C_OK) { /* RDB loading failed. */ - stopLoading(); + stopLoading(0); serverLog(LL_WARNING, "Failed trying to load the MASTER synchronization DB " "from socket"); @@ -1567,7 +1577,7 @@ void readSyncBulkPayload(connection *conn) { * gets promoted. */ return; } - stopLoading(); + stopLoading(1); /* RDB loading succeeded if we reach this point. */ if (server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB) { @@ -1614,7 +1624,7 @@ void readSyncBulkPayload(connection *conn) { cancelReplicationHandshake(); return; } - if (rdbLoad(server.rdb_filename,&rsi) != C_OK) { + if (rdbLoad(server.rdb_filename,&rsi,RDBFLAGS_REPLICATION) != C_OK) { serverLog(LL_WARNING, "Failed trying to load the MASTER synchronization " "DB from disk"); @@ -1636,6 +1646,11 @@ void readSyncBulkPayload(connection *conn) { server.repl_state = REPL_STATE_CONNECTED; server.repl_down_since = 0; + /* Fire the master link modules event. */ + moduleFireServerEvent(REDISMODULE_EVENT_MASTER_LINK_CHANGE, + REDISMODULE_SUBEVENT_MASTER_LINK_UP, + NULL); + /* After a full resynchroniziation we use the replication ID and * offset of the master. The secondary ID / offset are cleared since * we are starting a new history. */ @@ -2314,12 +2329,31 @@ void replicationSetMaster(char *ip, int port) { replicationDiscardCachedMaster(); replicationCacheMasterUsingMyself(); } + + /* Fire the role change modules event. */ + moduleFireServerEvent(REDISMODULE_EVENT_REPLICATION_ROLE_CHANGED, + REDISMODULE_EVENT_REPLROLECHANGED_NOW_REPLICA, + NULL); + + /* Fire the master link modules event. */ + if (server.repl_state == REPL_STATE_CONNECTED) + moduleFireServerEvent(REDISMODULE_EVENT_MASTER_LINK_CHANGE, + REDISMODULE_SUBEVENT_MASTER_LINK_DOWN, + NULL); + server.repl_state = REPL_STATE_CONNECT; } /* Cancel replication, setting the instance as a master itself. */ void replicationUnsetMaster(void) { if (server.masterhost == NULL) return; /* Nothing to do. */ + + /* Fire the master link modules event. */ + if (server.repl_state == REPL_STATE_CONNECTED) + moduleFireServerEvent(REDISMODULE_EVENT_MASTER_LINK_CHANGE, + REDISMODULE_SUBEVENT_MASTER_LINK_DOWN, + NULL); + sdsfree(server.masterhost); server.masterhost = NULL; /* When a slave is turned into a master, the current replication ID @@ -2348,11 +2382,22 @@ void replicationUnsetMaster(void) { * starting from now. Otherwise the backlog will be freed after a * failover if slaves do not connect immediately. */ server.repl_no_slaves_since = server.unixtime; + + /* Fire the role change modules event. */ + moduleFireServerEvent(REDISMODULE_EVENT_REPLICATION_ROLE_CHANGED, + REDISMODULE_EVENT_REPLROLECHANGED_NOW_MASTER, + NULL); } /* This function is called when the slave lose the connection with the * master into an unexpected way. */ void replicationHandleMasterDisconnection(void) { + /* Fire the master link modules event. */ + if (server.repl_state == REPL_STATE_CONNECTED) + moduleFireServerEvent(REDISMODULE_EVENT_MASTER_LINK_CHANGE, + REDISMODULE_SUBEVENT_MASTER_LINK_DOWN, + NULL); + server.master = NULL; server.repl_state = REPL_STATE_CONNECT; server.repl_down_since = server.unixtime; diff --git a/src/server.c b/src/server.c index 8f165113d..f42924764 100644 --- a/src/server.c +++ b/src/server.c @@ -2056,6 +2056,12 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { server.rdb_bgsave_scheduled = 0; } + /* Fire the cron loop modules event. */ + RedisModuleCronLoopV1 ei = {REDISMODULE_CRON_LOOP_VERSION,server.hz}; + moduleFireServerEvent(REDISMODULE_EVENT_CRON_LOOP, + 0, + &ei); + server.cronloops++; return 1000/server.hz; } @@ -3682,6 +3688,9 @@ int prepareForShutdown(int flags) { } } + /* Fire the shutdown modules event. */ + moduleFireServerEvent(REDISMODULE_EVENT_SHUTDOWN,0,NULL); + /* Remove the pid file if possible and needed. */ if (server.daemonize || server.pidfile) { serverLog(LL_NOTICE,"Removing the pid file."); @@ -4767,7 +4776,7 @@ void loadDataFromDisk(void) { serverLog(LL_NOTICE,"DB loaded from append only file: %.3f seconds",(float)(ustime()-start)/1000000); } else { rdbSaveInfo rsi = RDB_SAVE_INFO_INIT; - if (rdbLoad(server.rdb_filename,&rsi) == C_OK) { + if (rdbLoad(server.rdb_filename,&rsi,RDBFLAGS_NONE) == C_OK) { serverLog(LL_NOTICE,"DB loaded from disk: %.3f seconds", (float)(ustime()-start)/1000000); diff --git a/src/server.h b/src/server.h index 97672d727..b52d57a05 100644 --- a/src/server.h +++ b/src/server.h @@ -1602,6 +1602,7 @@ ssize_t rdbSaveModulesAux(rio *rdb, int when); int moduleAllDatatypesHandleErrors(); sds modulesCollectInfo(sds info, sds section, int for_crash_report, int sections); void moduleFireServerEvent(uint64_t eid, int subid, void *data); +void processModuleLoadingProgressEvent(int is_aof); /* Utils */ long long ustime(void); @@ -1831,10 +1832,12 @@ void rdbPipeReadHandler(struct aeEventLoop *eventLoop, int fd, void *clientData, void rdbPipeWriteHandlerConnRemoved(struct connection *conn); /* Generic persistence functions */ -void startLoadingFile(FILE* fp, char* filename); -void startLoading(size_t size); +void startLoadingFile(FILE* fp, char* filename, int rdbflags); +void startLoading(size_t size, int rdbflags); void loadingProgress(off_t pos); -void stopLoading(void); +void stopLoading(int success); +void startSaving(int rdbflags); +void stopSaving(int success); #define DISK_ERROR_TYPE_AOF 1 /* Don't accept writes: AOF errors. */ #define DISK_ERROR_TYPE_RDB 2 /* Don't accept writes: RDB errors. */ @@ -1843,7 +1846,6 @@ int writeCommandsDeniedByDiskError(void); /* RDB persistence */ #include "rdb.h" -int rdbSaveRio(rio *rdb, int *error, int flags, rdbSaveInfo *rsi); void killRDBChild(void); /* AOF persistence */ diff --git a/tests/modules/hooks.c b/tests/modules/hooks.c index 33b690b2f..665a20481 100644 --- a/tests/modules/hooks.c +++ b/tests/modules/hooks.c @@ -30,36 +30,227 @@ * POSSIBILITY OF SUCH DAMAGE. */ -#define REDISMODULE_EXPERIMENTAL_API #include "redismodule.h" +#include +#include + +/* We need to store events to be able to test and see what we got, and we can't + * store them in the key-space since that would mess up rdb loading (duplicates) + * and be lost of flushdb. */ +RedisModuleDict *event_log = NULL; + +typedef struct EventElement { + long count; + RedisModuleString *last_val_string; + long last_val_int; +} EventElement; + +void LogStringEvent(RedisModuleCtx *ctx, const char* keyname, const char* data) { + EventElement *event = RedisModule_DictGetC(event_log, (void*)keyname, strlen(keyname), NULL); + if (!event) { + event = RedisModule_Alloc(sizeof(EventElement)); + memset(event, 0, sizeof(EventElement)); + RedisModule_DictSetC(event_log, (void*)keyname, strlen(keyname), event); + } + if (event->last_val_string) RedisModule_FreeString(ctx, event->last_val_string); + event->last_val_string = RedisModule_CreateString(ctx, data, strlen(data)); + event->count++; +} + +void LogNumericEvent(RedisModuleCtx *ctx, const char* keyname, long data) { + REDISMODULE_NOT_USED(ctx); + EventElement *event = RedisModule_DictGetC(event_log, (void*)keyname, strlen(keyname), NULL); + if (!event) { + event = RedisModule_Alloc(sizeof(EventElement)); + memset(event, 0, sizeof(EventElement)); + RedisModule_DictSetC(event_log, (void*)keyname, strlen(keyname), event); + } + event->last_val_int = data; + event->count++; +} + +void FreeEvent(RedisModuleCtx *ctx, EventElement *event) { + if (event->last_val_string) + RedisModule_FreeString(ctx, event->last_val_string); + RedisModule_Free(event); +} + +int cmdEventCount(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) +{ + if (argc != 2){ + RedisModule_WrongArity(ctx); + return REDISMODULE_OK; + } + + EventElement *event = RedisModule_DictGet(event_log, argv[1], NULL); + RedisModule_ReplyWithLongLong(ctx, event? event->count: 0); + return REDISMODULE_OK; +} + +int cmdEventLast(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) +{ + if (argc != 2){ + RedisModule_WrongArity(ctx); + return REDISMODULE_OK; + } + + EventElement *event = RedisModule_DictGet(event_log, argv[1], NULL); + if (event && event->last_val_string) + RedisModule_ReplyWithString(ctx, event->last_val_string); + else if (event) + RedisModule_ReplyWithLongLong(ctx, event->last_val_int); + else + RedisModule_ReplyWithNull(ctx); + return REDISMODULE_OK; +} + +void clearEvents(RedisModuleCtx *ctx) +{ + RedisModuleString *key; + EventElement *event; + RedisModuleDictIter *iter = RedisModule_DictIteratorStart(event_log, "^", NULL); + while((key = RedisModule_DictNext(ctx, iter, (void**)&event)) != NULL) { + event->count = 0; + event->last_val_int = 0; + if (event->last_val_string) RedisModule_FreeString(ctx, event->last_val_string); + event->last_val_string = NULL; + RedisModule_DictDel(event_log, key, NULL); + RedisModule_Free(event); + } + RedisModule_DictIteratorStop(iter); +} + +int cmdEventsClear(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) +{ + REDISMODULE_NOT_USED(argc); + REDISMODULE_NOT_USED(argv); + clearEvents(ctx); + return REDISMODULE_OK; +} /* Client state change callback. */ void clientChangeCallback(RedisModuleCtx *ctx, RedisModuleEvent e, uint64_t sub, void *data) { - REDISMODULE_NOT_USED(ctx); REDISMODULE_NOT_USED(e); RedisModuleClientInfo *ci = data; char *keyname = (sub == REDISMODULE_SUBEVENT_CLIENT_CHANGE_CONNECTED) ? - "connected" : "disconnected"; - RedisModuleCallReply *reply; - RedisModule_SelectDb(ctx,9); - reply = RedisModule_Call(ctx,"RPUSH","cl",keyname,(long)ci->id); - RedisModule_FreeCallReply(reply); + "client-connected" : "client-disconnected"; + LogNumericEvent(ctx, keyname, ci->id); } void flushdbCallback(RedisModuleCtx *ctx, RedisModuleEvent e, uint64_t sub, void *data) { - REDISMODULE_NOT_USED(ctx); REDISMODULE_NOT_USED(e); RedisModuleFlushInfo *fi = data; char *keyname = (sub == REDISMODULE_SUBEVENT_FLUSHDB_START) ? "flush-start" : "flush-end"; - RedisModuleCallReply *reply; - RedisModule_SelectDb(ctx,9); - reply = RedisModule_Call(ctx,"RPUSH","cl",keyname,(long)fi->dbnum); - RedisModule_FreeCallReply(reply); + LogNumericEvent(ctx, keyname, fi->dbnum); +} + +void roleChangeCallback(RedisModuleCtx *ctx, RedisModuleEvent e, uint64_t sub, void *data) +{ + REDISMODULE_NOT_USED(e); + REDISMODULE_NOT_USED(data); + + RedisModuleReplicationInfo *ri = data; + char *keyname = (sub == REDISMODULE_EVENT_REPLROLECHANGED_NOW_MASTER) ? + "role-master" : "role-replica"; + LogStringEvent(ctx, keyname, ri->masterhost); +} + +void replicationChangeCallback(RedisModuleCtx *ctx, RedisModuleEvent e, uint64_t sub, void *data) +{ + REDISMODULE_NOT_USED(e); + REDISMODULE_NOT_USED(data); + + char *keyname = (sub == REDISMODULE_SUBEVENT_REPLICA_CHANGE_ONLINE) ? + "replica-online" : "replica-offline"; + LogNumericEvent(ctx, keyname, 0); +} + +void rasterLinkChangeCallback(RedisModuleCtx *ctx, RedisModuleEvent e, uint64_t sub, void *data) +{ + REDISMODULE_NOT_USED(e); + REDISMODULE_NOT_USED(data); + + char *keyname = (sub == REDISMODULE_SUBEVENT_MASTER_LINK_UP) ? + "masterlink-up" : "masterlink-down"; + LogNumericEvent(ctx, keyname, 0); +} + +void persistenceCallback(RedisModuleCtx *ctx, RedisModuleEvent e, uint64_t sub, void *data) +{ + REDISMODULE_NOT_USED(e); + REDISMODULE_NOT_USED(data); + + char *keyname = NULL; + switch (sub) { + case REDISMODULE_SUBEVENT_PERSISTENCE_RDB_START: keyname = "persistence-rdb-start"; break; + case REDISMODULE_SUBEVENT_PERSISTENCE_AOF_START: keyname = "persistence-aof-start"; break; + case REDISMODULE_SUBEVENT_PERSISTENCE_SYNC_RDB_START: keyname = "persistence-syncrdb-start"; break; + case REDISMODULE_SUBEVENT_PERSISTENCE_ENDED: keyname = "persistence-end"; break; + case REDISMODULE_SUBEVENT_PERSISTENCE_FAILED: keyname = "persistence-failed"; break; + } + /* modifying the keyspace from the fork child is not an option, using log instead */ + RedisModule_Log(ctx, "warning", "module-event-%s", keyname); + if (sub == REDISMODULE_SUBEVENT_PERSISTENCE_SYNC_RDB_START) + LogNumericEvent(ctx, keyname, 0); +} + +void loadingCallback(RedisModuleCtx *ctx, RedisModuleEvent e, uint64_t sub, void *data) +{ + REDISMODULE_NOT_USED(e); + REDISMODULE_NOT_USED(data); + + char *keyname = NULL; + switch (sub) { + case REDISMODULE_SUBEVENT_LOADING_RDB_START: keyname = "loading-rdb-start"; break; + case REDISMODULE_SUBEVENT_LOADING_AOF_START: keyname = "loading-aof-start"; break; + case REDISMODULE_SUBEVENT_LOADING_REPL_START: keyname = "loading-repl-start"; break; + case REDISMODULE_SUBEVENT_LOADING_ENDED: keyname = "loading-end"; break; + case REDISMODULE_SUBEVENT_LOADING_FAILED: keyname = "loading-failed"; break; + } + LogNumericEvent(ctx, keyname, 0); +} + +void loadingProgressCallback(RedisModuleCtx *ctx, RedisModuleEvent e, uint64_t sub, void *data) +{ + REDISMODULE_NOT_USED(e); + + RedisModuleLoadingProgress *ei = data; + char *keyname = (sub == REDISMODULE_SUBEVENT_LOADING_PROGRESS_RDB) ? + "loading-progress-rdb" : "loading-progress-aof"; + LogNumericEvent(ctx, keyname, ei->progress); +} + +void shutdownCallback(RedisModuleCtx *ctx, RedisModuleEvent e, uint64_t sub, void *data) +{ + REDISMODULE_NOT_USED(e); + REDISMODULE_NOT_USED(data); + REDISMODULE_NOT_USED(sub); + + RedisModule_Log(ctx, "warning", "module-event-%s", "shutdown"); +} + +void cronLoopCallback(RedisModuleCtx *ctx, RedisModuleEvent e, uint64_t sub, void *data) +{ + REDISMODULE_NOT_USED(e); + REDISMODULE_NOT_USED(sub); + + RedisModuleCronLoop *ei = data; + LogNumericEvent(ctx, "cron-loop", ei->hz); +} + +void moduleChangeCallback(RedisModuleCtx *ctx, RedisModuleEvent e, uint64_t sub, void *data) +{ + REDISMODULE_NOT_USED(e); + + RedisModuleModuleChange *ei = data; + char *keyname = (sub == REDISMODULE_SUBEVENT_MODULE_LOADED) ? + "module-loaded" : "module-unloaded"; + LogStringEvent(ctx, keyname, ei->module_name); } /* This function must be present on each Redis module. It is used in order to @@ -71,9 +262,50 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) if (RedisModule_Init(ctx,"testhook",1,REDISMODULE_APIVER_1) == REDISMODULE_ERR) return REDISMODULE_ERR; + /* replication related hooks */ + RedisModule_SubscribeToServerEvent(ctx, + RedisModuleEvent_ReplicationRoleChanged, roleChangeCallback); + RedisModule_SubscribeToServerEvent(ctx, + RedisModuleEvent_ReplicaChange, replicationChangeCallback); + RedisModule_SubscribeToServerEvent(ctx, + RedisModuleEvent_MasterLinkChange, rasterLinkChangeCallback); + + /* persistence related hooks */ + RedisModule_SubscribeToServerEvent(ctx, + RedisModuleEvent_Persistence, persistenceCallback); + RedisModule_SubscribeToServerEvent(ctx, + RedisModuleEvent_Loading, loadingCallback); + RedisModule_SubscribeToServerEvent(ctx, + RedisModuleEvent_LoadingProgress, loadingProgressCallback); + + /* other hooks */ RedisModule_SubscribeToServerEvent(ctx, RedisModuleEvent_ClientChange, clientChangeCallback); RedisModule_SubscribeToServerEvent(ctx, RedisModuleEvent_FlushDB, flushdbCallback); + RedisModule_SubscribeToServerEvent(ctx, + RedisModuleEvent_Shutdown, shutdownCallback); + RedisModule_SubscribeToServerEvent(ctx, + RedisModuleEvent_CronLoop, cronLoopCallback); + RedisModule_SubscribeToServerEvent(ctx, + RedisModuleEvent_ModuleChange, moduleChangeCallback); + + event_log = RedisModule_CreateDict(ctx); + + if (RedisModule_CreateCommand(ctx,"hooks.event_count", cmdEventCount,"",0,0,0) == REDISMODULE_ERR) + return REDISMODULE_ERR; + if (RedisModule_CreateCommand(ctx,"hooks.event_last", cmdEventLast,"",0,0,0) == REDISMODULE_ERR) + return REDISMODULE_ERR; + if (RedisModule_CreateCommand(ctx,"hooks.clear", cmdEventsClear,"",0,0,0) == REDISMODULE_ERR) + return REDISMODULE_ERR; + return REDISMODULE_OK; } + +int RedisModule_OnUnload(RedisModuleCtx *ctx) { + clearEvents(ctx); + RedisModule_FreeDict(ctx, event_log); + event_log = NULL; + return REDISMODULE_OK; +} + diff --git a/tests/unit/moduleapi/hooks.tcl b/tests/unit/moduleapi/hooks.tcl index 7a727902d..cbca9c3eb 100644 --- a/tests/unit/moduleapi/hooks.tcl +++ b/tests/unit/moduleapi/hooks.tcl @@ -3,26 +3,138 @@ set testmodule [file normalize tests/modules/hooks.so] tags "modules" { start_server {} { r module load $testmodule + r config set appendonly yes + test {Test clients connection / disconnection hooks} { for {set j 0} {$j < 2} {incr j} { set rd1 [redis_deferring_client] $rd1 close } - assert {[r llen connected] > 1} - assert {[r llen disconnected] > 1} + assert {[r hooks.event_count client-connected] > 1} + assert {[r hooks.event_count client-disconnected] > 1} + } + + test {Test module cron hook} { + after 100 + assert {[r hooks.event_count cron-loop] > 0} + set hz [r hooks.event_last cron-loop] + assert_equal $hz 10 + } + + test {Test module loaded / unloaded hooks} { + set othermodule [file normalize tests/modules/infotest.so] + r module load $othermodule + r module unload infotest + assert_equal [r hooks.event_last module-loaded] "infotest" + assert_equal [r hooks.event_last module-unloaded] "infotest" + } + + test {Test module aofrw hook} { + r debug populate 1000 foo 10000 ;# 10mb worth of data + r config set rdbcompression no ;# rdb progress is only checked once in 2mb + r BGREWRITEAOF + waitForBgrewriteaof r + assert_equal [string match {*module-event-persistence-aof-start*} [exec tail -20 < [srv 0 stdout]]] 1 + assert_equal [string match {*module-event-persistence-end*} [exec tail -20 < [srv 0 stdout]]] 1 + } + + test {Test module aof load and rdb/aof progress hooks} { + # create some aof tail (progress is checked only once in 1000 commands) + for {set j 0} {$j < 4000} {incr j} { + r set "bar$j" x + } + # set some configs that will cause many loading progress events during aof loading + r config set key-load-delay 1 + r config set dynamic-hz no + r config set hz 500 + r DEBUG LOADAOF + assert_equal [r hooks.event_last loading-aof-start] 0 + assert_equal [r hooks.event_last loading-end] 0 + assert {[r hooks.event_count loading-rdb-start] == 0} + assert {[r hooks.event_count loading-progress-rdb] >= 2} ;# comes from the preamble section + assert {[r hooks.event_count loading-progress-aof] >= 2} + } + # undo configs before next test + r config set dynamic-hz yes + r config set key-load-delay 0 + + test {Test module rdb save hook} { + # debug reload does: save, flush, load: + assert {[r hooks.event_count persistence-syncrdb-start] == 0} + assert {[r hooks.event_count loading-rdb-start] == 0} + r debug reload + assert {[r hooks.event_count persistence-syncrdb-start] == 1} + assert {[r hooks.event_count loading-rdb-start] == 1} } test {Test flushdb hooks} { - r flushall ;# Note: only the "end" RPUSH will survive - r select 1 r flushdb - r select 2 - r flushdb - r select 9 - assert {[r llen flush-start] == 2} - assert {[r llen flush-end] == 3} - assert {[r lrange flush-start 0 -1] eq {1 2}} - assert {[r lrange flush-end 0 -1] eq {-1 1 2}} + assert_equal [r hooks.event_last flush-start] 9 + assert_equal [r hooks.event_last flush-end] 9 + r flushall + assert_equal [r hooks.event_last flush-start] -1 + assert_equal [r hooks.event_last flush-end] -1 } + + # replication related tests + set master [srv 0 client] + set master_host [srv 0 host] + set master_port [srv 0 port] + start_server {} { + r module load $testmodule + set replica [srv 0 client] + set replica_host [srv 0 host] + set replica_port [srv 0 port] + $replica replicaof $master_host $master_port + + wait_for_condition 50 100 { + [string match {*master_link_status:up*} [r info replication]] + } else { + fail "Can't turn the instance into a replica" + } + + test {Test master link up hook} { + assert_equal [r hooks.event_count masterlink-up] 1 + assert_equal [r hooks.event_count masterlink-down] 0 + } + + test {Test role-replica hook} { + assert_equal [r hooks.event_count role-replica] 1 + assert_equal [r hooks.event_count role-master] 0 + assert_equal [r hooks.event_last role-replica] [s 0 master_host] + } + + test {Test replica-online hook} { + assert_equal [r -1 hooks.event_count replica-online] 1 + assert_equal [r -1 hooks.event_count replica-offline] 0 + } + + test {Test master link down hook} { + r client kill type master + assert_equal [r hooks.event_count masterlink-down] 1 + } + + $replica replicaof no one + + test {Test role-master hook} { + assert_equal [r hooks.event_count role-replica] 1 + assert_equal [r hooks.event_count role-master] 1 + assert_equal [r hooks.event_last role-master] {} + } + + test {Test replica-offline hook} { + assert_equal [r -1 hooks.event_count replica-online] 1 + assert_equal [r -1 hooks.event_count replica-offline] 1 + } + # get the replica stdout, to be used by the next test + set replica_stdout [srv 0 stdout] + } + + + # look into the log file of the server that just exited + test {Test shutdown hook} { + assert_equal [string match {*module-event-shutdown*} [exec tail -5 < $replica_stdout]] 1 + } + } } From 779aebc91cd8d3043ab172e0bc5b8c988df88e33 Mon Sep 17 00:00:00 2001 From: Oran Agra Date: Sun, 3 Nov 2019 16:42:31 +0200 Subject: [PATCH 3/7] Module API for loading and saving long double looks like each platform implements long double differently (different bit count) so we can't save them as binary, and we also want to avoid creating a new RDB format version, so we save these are hex strings using "%La". This commit includes a change in the arguments of ld2string to support this. as well as tests for coverage and short reads. coded by @guybe7 --- src/module.c | 27 ++++++++++++++++++ src/networking.c | 2 +- src/object.c | 2 +- src/redismodule.h | 4 +++ src/t_hash.c | 2 +- src/util.c | 62 ++++++++++++++++++++++++----------------- src/util.h | 9 +++++- tests/modules/testrdb.c | 9 +++++- 8 files changed, 87 insertions(+), 30 deletions(-) diff --git a/src/module.c b/src/module.c index f9f654b42..68d1d023d 100644 --- a/src/module.c +++ b/src/module.c @@ -3716,6 +3716,31 @@ loaderr: return 0; } +/* In the context of the rdb_save method of a module data type, saves a long double + * value to the RDB file. The double can be a valid number, a NaN or infinity. + * It is possible to load back the value with RedisModule_LoadLongDouble(). */ +void RM_SaveLongDouble(RedisModuleIO *io, long double value) { + if (io->error) return; + char buf[MAX_LONG_DOUBLE_CHARS]; + /* Long double has different number of bits in different platforms, so we + * save it as a string type. */ + size_t len = ld2string(buf,sizeof(buf),value,LD_STR_HEX); + RM_SaveStringBuffer(io,buf,len+1); /* len+1 for '\0' */ +} + +/* In the context of the rdb_save method of a module data type, loads back the + * long double value saved by RedisModule_SaveLongDouble(). */ +long double RM_LoadLongDouble(RedisModuleIO *io) { + if (io->error) return 0; + long double value; + size_t len; + char* str = RM_LoadStringBuffer(io,&len); + if (!str) return 0; + string2ld(str,len,&value); + RM_Free(str); + return value; +} + /* Iterate over modules, and trigger rdb aux saving for the ones modules types * who asked for it. */ ssize_t rdbSaveModulesAux(rio *rdb, int when) { @@ -6669,6 +6694,8 @@ void moduleRegisterCoreAPI(void) { REGISTER_API(LoadDouble); REGISTER_API(SaveFloat); REGISTER_API(LoadFloat); + REGISTER_API(SaveLongDouble); + REGISTER_API(LoadLongDouble); REGISTER_API(EmitAOF); REGISTER_API(Log); REGISTER_API(LogIOError); diff --git a/src/networking.c b/src/networking.c index e7cc561fa..428ab14ce 100644 --- a/src/networking.c +++ b/src/networking.c @@ -530,7 +530,7 @@ void addReplyHumanLongDouble(client *c, long double d) { decrRefCount(o); } else { char buf[MAX_LONG_DOUBLE_CHARS]; - int len = ld2string(buf,sizeof(buf),d,1); + int len = ld2string(buf,sizeof(buf),d,LD_STR_HUMAN); addReplyProto(c,",",1); addReplyProto(c,buf,len); addReplyProto(c,"\r\n",2); diff --git a/src/object.c b/src/object.c index 70022f897..53ad518a9 100644 --- a/src/object.c +++ b/src/object.c @@ -178,7 +178,7 @@ robj *createStringObjectFromLongLongForValue(long long value) { * The 'humanfriendly' option is used for INCRBYFLOAT and HINCRBYFLOAT. */ robj *createStringObjectFromLongDouble(long double value, int humanfriendly) { char buf[MAX_LONG_DOUBLE_CHARS]; - int len = ld2string(buf,sizeof(buf),value,humanfriendly); + int len = ld2string(buf,sizeof(buf),value,humanfriendly? LD_STR_HUMAN: LD_STR_AUTO); return createStringObject(buf,len); } diff --git a/src/redismodule.h b/src/redismodule.h index ea0d6a139..54d198592 100644 --- a/src/redismodule.h +++ b/src/redismodule.h @@ -457,6 +457,8 @@ void REDISMODULE_API_FUNC(RedisModule_SaveDouble)(RedisModuleIO *io, double valu double REDISMODULE_API_FUNC(RedisModule_LoadDouble)(RedisModuleIO *io); void REDISMODULE_API_FUNC(RedisModule_SaveFloat)(RedisModuleIO *io, float value); float REDISMODULE_API_FUNC(RedisModule_LoadFloat)(RedisModuleIO *io); +void REDISMODULE_API_FUNC(RedisModule_SaveLongDouble)(RedisModuleIO *io, long double value); +long double REDISMODULE_API_FUNC(RedisModule_LoadLongDouble)(RedisModuleIO *io); void REDISMODULE_API_FUNC(RedisModule_Log)(RedisModuleCtx *ctx, const char *level, const char *fmt, ...); void REDISMODULE_API_FUNC(RedisModule_LogIOError)(RedisModuleIO *io, const char *levelstr, const char *fmt, ...); void REDISMODULE_API_FUNC(RedisModule__Assert)(const char *estr, const char *file, int line); @@ -658,6 +660,8 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int REDISMODULE_GET_API(LoadDouble); REDISMODULE_GET_API(SaveFloat); REDISMODULE_GET_API(LoadFloat); + REDISMODULE_GET_API(SaveLongDouble); + REDISMODULE_GET_API(LoadLongDouble); REDISMODULE_GET_API(EmitAOF); REDISMODULE_GET_API(Log); REDISMODULE_GET_API(LogIOError); diff --git a/src/t_hash.c b/src/t_hash.c index e6ed33819..b9f0db7fc 100644 --- a/src/t_hash.c +++ b/src/t_hash.c @@ -621,7 +621,7 @@ void hincrbyfloatCommand(client *c) { } char buf[MAX_LONG_DOUBLE_CHARS]; - int len = ld2string(buf,sizeof(buf),value,1); + int len = ld2string(buf,sizeof(buf),value,LD_STR_HUMAN); new = sdsnewlen(buf,len); hashTypeSet(o,c->argv[2]->ptr,new,HASH_SET_TAKE_VALUE); addReplyBulkCBuffer(c,buf,len); diff --git a/src/util.c b/src/util.c index 783bcf83b..062a572d4 100644 --- a/src/util.c +++ b/src/util.c @@ -510,15 +510,17 @@ int d2string(char *buf, size_t len, double value) { return len; } -/* Convert a long double into a string. If humanfriendly is non-zero - * it does not use exponential format and trims trailing zeroes at the end, - * however this results in loss of precision. Otherwise exp format is used - * and the output of snprintf() is not modified. +/* Create a string object from a long double. + * If mode is humanfriendly it does not use exponential format and trims trailing + * zeroes at the end (may result in loss of precision). + * If mode is default exp format is used and the output of snprintf() + * is not modified (may result in loss of precision). + * If mode is hex hexadecimal format is used (no loss of precision) * * The function returns the length of the string or zero if there was not * enough buffer room to store it. */ -int ld2string(char *buf, size_t len, long double value, int humanfriendly) { - size_t l; +int ld2string(char *buf, size_t len, long double value, ld2string_mode mode) { + size_t l = 0; if (isinf(value)) { /* Libc in odd systems (Hi Solaris!) will format infinite in a @@ -531,26 +533,36 @@ int ld2string(char *buf, size_t len, long double value, int humanfriendly) { memcpy(buf,"-inf",4); l = 4; } - } else if (humanfriendly) { - /* We use 17 digits precision since with 128 bit floats that precision - * after rounding is able to represent most small decimal numbers in a - * way that is "non surprising" for the user (that is, most small - * decimal numbers will be represented in a way that when converted - * back into a string are exactly the same as what the user typed.) */ - l = snprintf(buf,len,"%.17Lf", value); - if (l+1 > len) return 0; /* No room. */ - /* Now remove trailing zeroes after the '.' */ - if (strchr(buf,'.') != NULL) { - char *p = buf+l-1; - while(*p == '0') { - p--; - l--; - } - if (*p == '.') l--; - } } else { - l = snprintf(buf,len,"%.17Lg", value); - if (l+1 > len) return 0; /* No room. */ + switch (mode) { + case LD_STR_AUTO: + l = snprintf(buf,len,"%.17Lg",value); + if (l+1 > len) return 0; /* No room. */ + break; + case LD_STR_HEX: + l = snprintf(buf,len,"%La",value); + if (l+1 > len) return 0; /* No room. */ + break; + case LD_STR_HUMAN: + /* We use 17 digits precision since with 128 bit floats that precision + * after rounding is able to represent most small decimal numbers in a + * way that is "non surprising" for the user (that is, most small + * decimal numbers will be represented in a way that when converted + * back into a string are exactly the same as what the user typed.) */ + l = snprintf(buf,len,"%.17Lf",value); + if (l+1 > len) return 0; /* No room. */ + /* Now remove trailing zeroes after the '.' */ + if (strchr(buf,'.') != NULL) { + char *p = buf+l-1; + while(*p == '0') { + p--; + l--; + } + if (*p == '.') l--; + } + break; + default: return 0; /* Invalid mode. */ + } } buf[l] = '\0'; return l; diff --git a/src/util.h b/src/util.h index b6c01aa59..a91addb80 100644 --- a/src/util.h +++ b/src/util.h @@ -38,6 +38,13 @@ * This should be the size of the buffer given to ld2string */ #define MAX_LONG_DOUBLE_CHARS 5*1024 +/* long double to string convertion options */ +typedef enum { + LD_STR_AUTO, /* %.17Lg */ + LD_STR_HUMAN, /* %.17Lf + Trimming of trailing zeros */ + LD_STR_HEX /* %La */ +} ld2string_mode; + int stringmatchlen(const char *p, int plen, const char *s, int slen, int nocase); int stringmatch(const char *p, const char *s, int nocase); int stringmatchlen_fuzz_test(void); @@ -49,7 +56,7 @@ int string2ll(const char *s, size_t slen, long long *value); int string2l(const char *s, size_t slen, long *value); int string2ld(const char *s, size_t slen, long double *dp); int d2string(char *buf, size_t len, double value); -int ld2string(char *buf, size_t len, long double value, int humanfriendly); +int ld2string(char *buf, size_t len, long double value, ld2string_mode mode); sds getAbsolutePath(char *filename); unsigned long getTimeZone(void); int pathIsBaseName(char *path); diff --git a/tests/modules/testrdb.c b/tests/modules/testrdb.c index eb8d1a999..8a262e8a7 100644 --- a/tests/modules/testrdb.c +++ b/tests/modules/testrdb.c @@ -15,11 +15,16 @@ RedisModuleString *after_str = NULL; void *testrdb_type_load(RedisModuleIO *rdb, int encver) { int count = RedisModule_LoadSigned(rdb); + RedisModuleString *str = RedisModule_LoadString(rdb); + float f = RedisModule_LoadFloat(rdb); + long double ld = RedisModule_LoadLongDouble(rdb); if (RedisModule_IsIOError(rdb)) return NULL; + /* Using the values only after checking for io errors. */ assert(count==1); assert(encver==1); - RedisModuleString *str = RedisModule_LoadString(rdb); + assert(f==1.5f); + assert(ld==0.333333333333333333L); return str; } @@ -27,6 +32,8 @@ void testrdb_type_save(RedisModuleIO *rdb, void *value) { RedisModuleString *str = (RedisModuleString*)value; RedisModule_SaveSigned(rdb, 1); RedisModule_SaveString(rdb, str); + RedisModule_SaveFloat(rdb, 1.5); + RedisModule_SaveLongDouble(rdb, 0.333333333333333333L); } void testrdb_aux_save(RedisModuleIO *rdb, int when) { From 87332ce524e30d4949d2144a4da15b5ae17e5051 Mon Sep 17 00:00:00 2001 From: Oran Agra Date: Sun, 3 Nov 2019 17:35:35 +0200 Subject: [PATCH 4/7] Module API for PUBLISH, FLUSHALL, RANDOMKEY, DBSIZE --- src/db.c | 44 ++++++++++++++++++++++++-------------------- src/module.c | 35 +++++++++++++++++++++++++++++++++++ src/redismodule.h | 8 ++++++++ src/server.h | 2 ++ 4 files changed, 69 insertions(+), 20 deletions(-) diff --git a/src/db.c b/src/db.c index 2c0a0cdd3..ad19b42dd 100644 --- a/src/db.c +++ b/src/db.c @@ -461,6 +461,29 @@ int getFlushCommandFlags(client *c, int *flags) { return C_OK; } +/* Flushes the whole server data set. */ +void flushAllDataAndResetRDB(int flags) { + server.dirty += emptyDb(-1,flags,NULL); + if (server.rdb_child_pid != -1) killRDBChild(); + if (server.saveparamslen > 0) { + /* Normally rdbSave() will reset dirty, but we don't want this here + * as otherwise FLUSHALL will not be replicated nor put into the AOF. */ + int saved_dirty = server.dirty; + rdbSaveInfo rsi, *rsiptr; + rsiptr = rdbPopulateSaveInfo(&rsi); + rdbSave(server.rdb_filename,rsiptr); + server.dirty = saved_dirty; + } + server.dirty++; +#if defined(USE_JEMALLOC) + /* jemalloc 5 doesn't release pages back to the OS when there's no traffic. + * for large databases, flushdb blocks for long anyway, so a bit more won't + * harm and this way the flush and purge will be synchroneus. */ + if (!(flags & EMPTYDB_ASYNC)) + jemalloc_purge(); +#endif +} + /* FLUSHDB [ASYNC] * * Flushes the currently SELECTed Redis DB. */ @@ -484,28 +507,9 @@ void flushdbCommand(client *c) { * Flushes the whole server data set. */ void flushallCommand(client *c) { int flags; - if (getFlushCommandFlags(c,&flags) == C_ERR) return; - server.dirty += emptyDb(-1,flags,NULL); + flushAllDataAndResetRDB(flags); addReply(c,shared.ok); - if (server.rdb_child_pid != -1) killRDBChild(); - if (server.saveparamslen > 0) { - /* Normally rdbSave() will reset dirty, but we don't want this here - * as otherwise FLUSHALL will not be replicated nor put into the AOF. */ - int saved_dirty = server.dirty; - rdbSaveInfo rsi, *rsiptr; - rsiptr = rdbPopulateSaveInfo(&rsi); - rdbSave(server.rdb_filename,rsiptr); - server.dirty = saved_dirty; - } - server.dirty++; -#if defined(USE_JEMALLOC) - /* jemalloc 5 doesn't release pages back to the OS when there's no traffic. - * for large databases, flushdb blocks for long anyway, so a bit more won't - * harm and this way the flush and purge will be synchroneus. */ - if (!(flags & EMPTYDB_ASYNC)) - jemalloc_purge(); -#endif } /* This command implements DEL and LAZYDEL. */ diff --git a/src/module.c b/src/module.c index f9f654b42..5453aed47 100644 --- a/src/module.c +++ b/src/module.c @@ -1677,6 +1677,15 @@ int RM_GetClientInfoById(void *ci, uint64_t id) { return modulePopulateClientInfoStructure(ci,client,structver); } +/* Publish a message to subscribers (see PUBLISH command). */ +int RM_PublishMessage(RedisModuleCtx *ctx, RedisModuleString *channel, RedisModuleString *message) { + UNUSED(ctx); + int receivers = pubsubPublishMessage(channel, message); + if (server.cluster_enabled) + clusterPropagatePublish(channel, message); + return receivers; +} + /* Return the currently selected DB. */ int RM_GetSelectedDb(RedisModuleCtx *ctx) { return ctx->client->db->id; @@ -1964,6 +1973,28 @@ int RM_SetExpire(RedisModuleKey *key, mstime_t expire) { return REDISMODULE_OK; } +/* Performs similar operation to FLUSHALL, and optionally start a new AOF file (if enabled) + * If restart_aof is true, you must make sure the command that triggered this call is not + * propagated to the AOF file. + * When async is set to true, db contents will be freed by a background thread. */ +void RM_ResetDataset(int restart_aof, int async) { + if (restart_aof && server.aof_state != AOF_OFF) stopAppendOnly(); + flushAllDataAndResetRDB(async? EMPTYDB_ASYNC: EMPTYDB_NO_FLAGS); + if (server.aof_enabled && restart_aof) restartAOFAfterSYNC(); +} + +/* Returns the number of keys in the current db. */ +unsigned long long RM_DbSize(RedisModuleCtx *ctx) { + return dictSize(ctx->client->db->dict); +} + +/* Returns a name of a random key, or NULL if current db is empty. */ +RedisModuleString *RM_RandomKey(RedisModuleCtx *ctx) { + robj *key = dbRandomKey(ctx->client->db); + autoMemoryAdd(ctx,REDISMODULE_AM_STRING,key); + return key; +} + /* -------------------------------------------------------------------------- * Key API for String type * -------------------------------------------------------------------------- */ @@ -6630,6 +6661,9 @@ void moduleRegisterCoreAPI(void) { REGISTER_API(StringTruncate); REGISTER_API(SetExpire); REGISTER_API(GetExpire); + REGISTER_API(ResetDataset); + REGISTER_API(DbSize); + REGISTER_API(RandomKey); REGISTER_API(ZsetAdd); REGISTER_API(ZsetIncrby); REGISTER_API(ZsetScore); @@ -6757,6 +6791,7 @@ void moduleRegisterCoreAPI(void) { REGISTER_API(InfoAddFieldLongLong); REGISTER_API(InfoAddFieldULongLong); REGISTER_API(GetClientInfoById); + REGISTER_API(PublishMessage); REGISTER_API(SubscribeToServerEvent); REGISTER_API(BlockClientOnKeys); REGISTER_API(SignalKeyAsReady); diff --git a/src/redismodule.h b/src/redismodule.h index ea0d6a139..77019f89e 100644 --- a/src/redismodule.h +++ b/src/redismodule.h @@ -416,6 +416,9 @@ char *REDISMODULE_API_FUNC(RedisModule_StringDMA)(RedisModuleKey *key, size_t *l int REDISMODULE_API_FUNC(RedisModule_StringTruncate)(RedisModuleKey *key, size_t newlen); mstime_t REDISMODULE_API_FUNC(RedisModule_GetExpire)(RedisModuleKey *key); int REDISMODULE_API_FUNC(RedisModule_SetExpire)(RedisModuleKey *key, mstime_t expire); +void REDISMODULE_API_FUNC(RedisModule_ResetDataset)(int restart_aof, int async); +unsigned long long REDISMODULE_API_FUNC(RedisModule_DbSize)(RedisModuleCtx *ctx); +RedisModuleString *REDISMODULE_API_FUNC(RedisModule_RandomKey)(RedisModuleCtx *ctx); int REDISMODULE_API_FUNC(RedisModule_ZsetAdd)(RedisModuleKey *key, double score, RedisModuleString *ele, int *flagsptr); int REDISMODULE_API_FUNC(RedisModule_ZsetIncrby)(RedisModuleKey *key, double score, RedisModuleString *ele, int *flagsptr, double *newscore); int REDISMODULE_API_FUNC(RedisModule_ZsetScore)(RedisModuleKey *key, RedisModuleString *ele, double *score); @@ -435,6 +438,7 @@ int REDISMODULE_API_FUNC(RedisModule_IsKeysPositionRequest)(RedisModuleCtx *ctx) void REDISMODULE_API_FUNC(RedisModule_KeyAtPos)(RedisModuleCtx *ctx, int pos); unsigned long long REDISMODULE_API_FUNC(RedisModule_GetClientId)(RedisModuleCtx *ctx); int REDISMODULE_API_FUNC(RedisModule_GetClientInfoById)(void *ci, uint64_t id); +int REDISMODULE_API_FUNC(RedisModule_PublishMessage)(RedisModuleCtx *ctx, RedisModuleString *channel, RedisModuleString *message); int REDISMODULE_API_FUNC(RedisModule_GetContextFlags)(RedisModuleCtx *ctx); void *REDISMODULE_API_FUNC(RedisModule_PoolAlloc)(RedisModuleCtx *ctx, size_t bytes); RedisModuleType *REDISMODULE_API_FUNC(RedisModule_CreateDataType)(RedisModuleCtx *ctx, const char *name, int encver, RedisModuleTypeMethods *typemethods); @@ -619,6 +623,9 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int REDISMODULE_GET_API(StringTruncate); REDISMODULE_GET_API(GetExpire); REDISMODULE_GET_API(SetExpire); + REDISMODULE_GET_API(ResetDataset); + REDISMODULE_GET_API(DbSize); + REDISMODULE_GET_API(RandomKey); REDISMODULE_GET_API(ZsetAdd); REDISMODULE_GET_API(ZsetIncrby); REDISMODULE_GET_API(ZsetScore); @@ -705,6 +712,7 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int REDISMODULE_GET_API(InfoAddFieldLongLong); REDISMODULE_GET_API(InfoAddFieldULongLong); REDISMODULE_GET_API(GetClientInfoById); + REDISMODULE_GET_API(PublishMessage); REDISMODULE_GET_API(SubscribeToServerEvent); REDISMODULE_GET_API(BlockClientOnKeys); REDISMODULE_GET_API(SignalKeyAsReady); diff --git a/src/server.h b/src/server.h index f724f7d64..4287d1adf 100644 --- a/src/server.h +++ b/src/server.h @@ -1862,6 +1862,7 @@ void aofRewriteBufferReset(void); unsigned long aofRewriteBufferSize(void); ssize_t aofReadDiffFromParent(void); void killAppendOnlyChild(void); +void restartAOFAfterSYNC(); /* Child info */ void openChildInfoPipe(void); @@ -2101,6 +2102,7 @@ robj *dbUnshareStringValue(redisDb *db, robj *key, robj *o); #define EMPTYDB_ASYNC (1<<0) /* Reclaim memory in another thread. */ long long emptyDb(int dbnum, int flags, void(callback)(void*)); long long emptyDbGeneric(redisDb *dbarray, int dbnum, int flags, void(callback)(void*)); +void flushAllDataAndResetRDB(int flags); long long dbTotalServerKeyCount(); int selectDb(client *c, int id); From b81f486c2f0d9f8ae14fc7c0568ba59e629995d6 Mon Sep 17 00:00:00 2001 From: Guy Benoish Date: Thu, 29 Mar 2018 17:46:13 +0700 Subject: [PATCH 5/7] Modules: Test RedisModule_BlockClientOnKeys --- runtest-moduleapi | 1 + tests/modules/Makefile | 3 +- tests/modules/blockonkeys.c | 261 +++++++++++++++++++++++++++ tests/unit/moduleapi/blockonkeys.tcl | 85 +++++++++ 4 files changed, 349 insertions(+), 1 deletion(-) create mode 100644 tests/modules/blockonkeys.c create mode 100644 tests/unit/moduleapi/blockonkeys.tcl diff --git a/runtest-moduleapi b/runtest-moduleapi index 9301002c9..e48535126 100755 --- a/runtest-moduleapi +++ b/runtest-moduleapi @@ -21,4 +21,5 @@ $TCLSH tests/test_helper.tcl \ --single unit/moduleapi/propagate \ --single unit/moduleapi/hooks \ --single unit/moduleapi/misc \ +--single unit/moduleapi/blockonkeys \ "${@}" diff --git a/tests/modules/Makefile b/tests/modules/Makefile index 71c0b5ef8..9e27758a2 100644 --- a/tests/modules/Makefile +++ b/tests/modules/Makefile @@ -18,7 +18,8 @@ TEST_MODULES = \ infotest.so \ propagate.so \ misc.so \ - hooks.so + hooks.so \ + blockonkeys.so .PHONY: all diff --git a/tests/modules/blockonkeys.c b/tests/modules/blockonkeys.c new file mode 100644 index 000000000..959918b1c --- /dev/null +++ b/tests/modules/blockonkeys.c @@ -0,0 +1,261 @@ +#define REDISMODULE_EXPERIMENTAL_API +#include "redismodule.h" + +#include +#include +#include + +#define LIST_SIZE 1024 + +typedef struct { + long long list[LIST_SIZE]; + long long length; +} fsl_t; /* Fixed-size list */ + +static RedisModuleType *fsltype = NULL; + +fsl_t *fsl_type_create() { + fsl_t *o; + o = RedisModule_Alloc(sizeof(*o)); + o->length = 0; + return o; +} + +void fsl_type_free(fsl_t *o) { + RedisModule_Free(o); +} + +/* ========================== "fsltype" type methods ======================= */ + +void *fsl_rdb_load(RedisModuleIO *rdb, int encver) { + if (encver != 0) { + return NULL; + } + fsl_t *fsl = fsl_type_create(); + fsl->length = RedisModule_LoadUnsigned(rdb); + for (long long i = 0; i < fsl->length; i++) + fsl->list[i] = RedisModule_LoadSigned(rdb); + return fsl; +} + +void fsl_rdb_save(RedisModuleIO *rdb, void *value) { + fsl_t *fsl = value; + RedisModule_SaveUnsigned(rdb,fsl->length); + for (long long i = 0; i < fsl->length; i++) + RedisModule_SaveSigned(rdb, fsl->list[i]); +} + +void fsl_aofrw(RedisModuleIO *aof, RedisModuleString *key, void *value) { + fsl_t *fsl = value; + for (long long i = 0; i < fsl->length; i++) + RedisModule_EmitAOF(aof, "FSL.PUSH","sl", key, fsl->list[i]); +} + +void fsl_free(void *value) { + fsl_type_free(value); +} + +/* ========================== helper methods ======================= */ + +int get_fsl(RedisModuleCtx *ctx, RedisModuleString *keyname, int mode, int create, fsl_t **fsl, int reply_on_failure) { + RedisModuleKey *key = RedisModule_OpenKey(ctx, keyname, mode); + + int type = RedisModule_KeyType(key); + if (type != REDISMODULE_KEYTYPE_EMPTY && RedisModule_ModuleTypeGetType(key) != fsltype) { + RedisModule_CloseKey(key); + if (reply_on_failure) + RedisModule_ReplyWithError(ctx, REDISMODULE_ERRORMSG_WRONGTYPE); + return 0; + } + + /* Create an empty value object if the key is currently empty. */ + if (type == REDISMODULE_KEYTYPE_EMPTY) { + if (!create) { + /* Key is empty but we cannot create */ + RedisModule_CloseKey(key); + *fsl = NULL; + return 1; + } + *fsl = fsl_type_create(); + RedisModule_ModuleTypeSetValue(key, fsltype, *fsl); + } else { + *fsl = RedisModule_ModuleTypeGetValue(key); + } + + RedisModule_CloseKey(key); + return 1; +} + +/* ========================== commands ======================= */ + +/* FSL.PUSH - Push an integer to the fixed-size list (to the right). + * It must be greater than the element in the head of the list. */ +int fsl_push(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + if (argc != 3) + return RedisModule_WrongArity(ctx); + + long long ele; + if (RedisModule_StringToLongLong(argv[2],&ele) != REDISMODULE_OK) + return RedisModule_ReplyWithError(ctx,"ERR invalid integer"); + + fsl_t *fsl; + if (!get_fsl(ctx, argv[1], REDISMODULE_WRITE, 1, &fsl, 1)) + return REDISMODULE_OK; + + if (fsl->length == LIST_SIZE) + return RedisModule_ReplyWithError(ctx,"ERR list is full"); + + if (fsl->length != 0 && fsl->list[fsl->length-1] >= ele) + return RedisModule_ReplyWithError(ctx,"ERR new element has to be greater than the head element"); + + fsl->list[fsl->length++] = ele; + + if (fsl->length >= 2) + RedisModule_SignalKeyAsReady(ctx, argv[1]); + + return RedisModule_ReplyWithSimpleString(ctx, "OK"); +} + +int bpop2_reply_callback(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + REDISMODULE_NOT_USED(argv); + REDISMODULE_NOT_USED(argc); + RedisModuleString *keyname = RedisModule_GetBlockedClientReadyKey(ctx); + + fsl_t *fsl; + if (!get_fsl(ctx, keyname, REDISMODULE_READ, 0, &fsl, 0)) + return REDISMODULE_ERR; + + if (!fsl || fsl->length < 2) + return REDISMODULE_ERR; + + RedisModule_ReplyWithArray(ctx, 2); + RedisModule_ReplyWithLongLong(ctx, fsl->list[--fsl->length]); + RedisModule_ReplyWithLongLong(ctx, fsl->list[--fsl->length]); + return REDISMODULE_OK; +} + +int bpop2_timeout_callback(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + REDISMODULE_NOT_USED(argv); + REDISMODULE_NOT_USED(argc); + return RedisModule_ReplyWithSimpleString(ctx, "Request timedout"); +} + + +/* FSL.BPOP2 - Block clients until list has two or more elements. + * When that happens, unblock client and pop the last two elements (from the right). */ +int fsl_bpop2(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + if (argc != 3) + return RedisModule_WrongArity(ctx); + + long long timeout; + if (RedisModule_StringToLongLong(argv[2],&timeout) != REDISMODULE_OK || timeout < 0) + return RedisModule_ReplyWithError(ctx,"ERR invalid timeout"); + + fsl_t *fsl; + if (!get_fsl(ctx, argv[1], REDISMODULE_READ, 0, &fsl, 1)) + return REDISMODULE_OK; + + if (!fsl || fsl->length < 2) { + /* Key is empty or has <2 elements, we must block */ + RedisModule_BlockClientOnKeys(ctx, bpop2_reply_callback, bpop2_timeout_callback, + NULL, timeout, &argv[1], 1, NULL); + } else { + RedisModule_ReplyWithArray(ctx, 2); + RedisModule_ReplyWithLongLong(ctx, fsl->list[--fsl->length]); + RedisModule_ReplyWithLongLong(ctx, fsl->list[--fsl->length]); + } + + return REDISMODULE_OK; +} + +int bpopgt_reply_callback(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + REDISMODULE_NOT_USED(argv); + REDISMODULE_NOT_USED(argc); + RedisModuleString *keyname = RedisModule_GetBlockedClientReadyKey(ctx); + long long gt = (long long)RedisModule_GetBlockedClientPrivateData(ctx); + + fsl_t *fsl; + if (!get_fsl(ctx, keyname, REDISMODULE_READ, 0, &fsl, 0)) + return REDISMODULE_ERR; + + if (!fsl || fsl->list[fsl->length-1] <= gt) + return REDISMODULE_ERR; + + RedisModule_ReplyWithLongLong(ctx, fsl->list[--fsl->length]); + return REDISMODULE_OK; +} + +int bpopgt_timeout_callback(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + REDISMODULE_NOT_USED(argv); + REDISMODULE_NOT_USED(argc); + return RedisModule_ReplyWithSimpleString(ctx, "Request timedout"); +} + +void bpopgt_free_privdata(RedisModuleCtx *ctx, void *privdata) { + /* Nothing to do because privdata is actually a 'long long', + * not a pointer to the heap */ + REDISMODULE_NOT_USED(ctx); + REDISMODULE_NOT_USED(privdata); +} + +/* FSL.BPOPGT - Block clients until list has an element greater than . + * When that happens, unblock client and pop the last element (from the right). */ +int fsl_bpopgt(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + if (argc != 4) + return RedisModule_WrongArity(ctx); + + long long gt; + if (RedisModule_StringToLongLong(argv[2],>) != REDISMODULE_OK) + return RedisModule_ReplyWithError(ctx,"ERR invalid integer"); + + long long timeout; + if (RedisModule_StringToLongLong(argv[3],&timeout) != REDISMODULE_OK || timeout < 0) + return RedisModule_ReplyWithError(ctx,"ERR invalid timeout"); + + fsl_t *fsl; + if (!get_fsl(ctx, argv[1], REDISMODULE_READ, 0, &fsl, 1)) + return REDISMODULE_OK; + + if (!fsl || fsl->list[fsl->length-1] <= gt) { + /* Key is empty or has <2 elements, we must block */ + RedisModule_BlockClientOnKeys(ctx, bpopgt_reply_callback, bpopgt_timeout_callback, + bpopgt_free_privdata, timeout, &argv[1], 1, (void*)gt); + } else { + RedisModule_ReplyWithLongLong(ctx, fsl->list[--fsl->length]); + } + + return REDISMODULE_OK; +} + +int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + REDISMODULE_NOT_USED(argv); + REDISMODULE_NOT_USED(argc); + + if (RedisModule_Init(ctx, "blockonkeys", 1, REDISMODULE_APIVER_1)== REDISMODULE_ERR) + return REDISMODULE_ERR; + + RedisModuleTypeMethods tm = { + .version = REDISMODULE_TYPE_METHOD_VERSION, + .rdb_load = fsl_rdb_load, + .rdb_save = fsl_rdb_save, + .aof_rewrite = fsl_aofrw, + .mem_usage = NULL, + .free = fsl_free, + .digest = NULL + }; + + fsltype = RedisModule_CreateDataType(ctx, "fsltype_t", 0, &tm); + if (fsltype == NULL) + return REDISMODULE_ERR; + + if (RedisModule_CreateCommand(ctx,"fsl.push",fsl_push,"",0,0,0) == REDISMODULE_ERR) + return REDISMODULE_ERR; + + if (RedisModule_CreateCommand(ctx,"fsl.bpop2",fsl_bpop2,"",0,0,0) == REDISMODULE_ERR) + return REDISMODULE_ERR; + + if (RedisModule_CreateCommand(ctx,"fsl.bpopgt",fsl_bpopgt,"",0,0,0) == REDISMODULE_ERR) + return REDISMODULE_ERR; + + return REDISMODULE_OK; +} diff --git a/tests/unit/moduleapi/blockonkeys.tcl b/tests/unit/moduleapi/blockonkeys.tcl new file mode 100644 index 000000000..cb99ab1c9 --- /dev/null +++ b/tests/unit/moduleapi/blockonkeys.tcl @@ -0,0 +1,85 @@ +set testmodule [file normalize tests/modules/blockonkeys.so] + +start_server {tags {"modules"}} { + r module load $testmodule + + test {Module client blocked on keys (no metadata): No block} { + r del k + r fsl.push k 33 + r fsl.push k 34 + r fsl.bpop2 k 0 + } {34 33} + + test {Module client blocked on keys (no metadata): Timeout} { + r del k + set rd [redis_deferring_client] + r fsl.push k 33 + $rd fsl.bpop2 k 1 + assert_equal {Request timedout} [$rd read] + } + + test {Module client blocked on keys (no metadata): Blocked, case 1} { + r del k + set rd [redis_deferring_client] + r fsl.push k 33 + $rd fsl.bpop2 k 0 + r fsl.push k 34 + assert_equal {34 33} [$rd read] + } + + test {Module client blocked on keys (no metadata): Blocked, case 2} { + r del k + set rd [redis_deferring_client] + r fsl.push k 33 + r fsl.push k 34 + $rd fsl.bpop2 k 0 + assert_equal {34 33} [$rd read] + } + + test {Module client blocked on keys (with metadata): No block} { + r del k + r fsl.push k 34 + r fsl.bpopgt k 30 0 + } {34} + + test {Module client blocked on keys (with metadata): Timeout} { + r del k + set rd [redis_deferring_client] + r fsl.push k 33 + $rd fsl.bpopgt k 35 1 + assert_equal {Request timedout} [$rd read] + } + + test {Module client blocked on keys (with metadata): Blocked, case 1} { + r del k + set rd [redis_deferring_client] + r fsl.push k 33 + $rd fsl.bpopgt k 33 0 + r fsl.push k 34 + assert_equal {34} [$rd read] + } + + test {Module client blocked on keys (with metadata): Blocked, case 2} { + r del k + set rd [redis_deferring_client] + $rd fsl.bpopgt k 35 0 + r fsl.push k 33 + r fsl.push k 34 + r fsl.push k 35 + r fsl.push k 36 + assert_equal {36} [$rd read] + } + + test {Module client blocked on keys does not wake up on wrong type} { + r del k + set rd [redis_deferring_client] + $rd fsl.bpop2 k 0 + r lpush k 12 + r lpush k 13 + r lpush k 14 + r del k + r fsl.push k 33 + r fsl.push k 34 + assert_equal {34 33} [$rd read] + } +} From b12d2f65d660b4139b322af90b6ef60ed267210b Mon Sep 17 00:00:00 2001 From: Loris Cro Date: Mon, 4 Nov 2019 16:36:06 +0100 Subject: [PATCH 6/7] fix unreported overflow in autogerenared stream IDs --- src/t_stream.c | 23 +++++++++++++---------- tests/unit/type/stream.tcl | 6 ++++++ 2 files changed, 19 insertions(+), 10 deletions(-) diff --git a/src/t_stream.c b/src/t_stream.c index ea9a620f1..58b59f521 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -173,9 +173,19 @@ int streamCompareID(streamID *a, streamID *b) { * C_ERR if an ID was given via 'use_id', but adding it failed since the * current top ID is greater or equal. */ int streamAppendItem(stream *s, robj **argv, int64_t numfields, streamID *added_id, streamID *use_id) { - /* If an ID was given, check that it's greater than the last entry ID - * or return an error. */ - if (use_id && streamCompareID(use_id,&s->last_id) <= 0) return C_ERR; + + /* Generate the new entry ID. */ + streamID id; + if (use_id) + id = *use_id; + else + streamNextID(&s->last_id,&id); + + /* Check that the new ID is greater than the last entry ID + * or return an error. Automatically generated IDs might + * overflow (and wrap-around) when incrementing the sequence + part. */ + if (streamCompareID(&id,&s->last_id) <= 0) return C_ERR; /* Add the new entry. */ raxIterator ri; @@ -192,13 +202,6 @@ int streamAppendItem(stream *s, robj **argv, int64_t numfields, streamID *added_ } raxStop(&ri); - /* Generate the new entry ID. */ - streamID id; - if (use_id) - id = *use_id; - else - streamNextID(&s->last_id,&id); - /* We have to add the key into the radix tree in lexicographic order, * to do so we consider the ID as a single 128 bit number written in * big endian, so that the most significant bytes are the first ones. */ diff --git a/tests/unit/type/stream.tcl b/tests/unit/type/stream.tcl index a7415ae8d..aa9c5f3a9 100644 --- a/tests/unit/type/stream.tcl +++ b/tests/unit/type/stream.tcl @@ -79,6 +79,12 @@ start_server { assert {[streamCompareID $id2 $id3] == -1} } + test {XADD IDs correctly report an error when overflowing} { + r DEL mystream + r xadd mystream 18446744073709551615-18446744073709551615 a b + assert_error ERR* {r xadd mystream * c d} + } + test {XADD with MAXLEN option} { r DEL mystream for {set j 0} {$j < 1000} {incr j} { From 3adf10b8095475e2170831e036f67e6b2c9c6bdb Mon Sep 17 00:00:00 2001 From: Oran Agra Date: Mon, 4 Nov 2019 19:30:31 +0200 Subject: [PATCH 7/7] Test coverage for new module APIs: dbsize, flushall, randomkey, lru get/set --- tests/modules/misc.c | 69 +++++++++++++++++++++++++++++++++++ tests/unit/moduleapi/misc.tcl | 19 ++++++++++ 2 files changed, 88 insertions(+) diff --git a/tests/modules/misc.c b/tests/modules/misc.c index fd892f52c..7701a9c7c 100644 --- a/tests/modules/misc.c +++ b/tests/modules/misc.c @@ -40,6 +40,65 @@ int test_call_info(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) return REDISMODULE_OK; } +int test_flushall(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) +{ + REDISMODULE_NOT_USED(argv); + REDISMODULE_NOT_USED(argc); + RedisModule_ResetDataset(1, 0); + RedisModule_ReplyWithCString(ctx, "Ok"); + return REDISMODULE_OK; +} + +int test_dbsize(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) +{ + REDISMODULE_NOT_USED(argv); + REDISMODULE_NOT_USED(argc); + long long ll = RedisModule_DbSize(ctx); + RedisModule_ReplyWithLongLong(ctx, ll); + return REDISMODULE_OK; +} + +int test_randomkey(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) +{ + REDISMODULE_NOT_USED(argv); + REDISMODULE_NOT_USED(argc); + RedisModuleString *str = RedisModule_RandomKey(ctx); + RedisModule_ReplyWithString(ctx, str); + RedisModule_FreeString(ctx, str); + return REDISMODULE_OK; +} + +int test_getlru(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) +{ + if (argc<2) { + RedisModule_WrongArity(ctx); + return REDISMODULE_OK; + } + RedisModuleString *keyname = argv[1]; + RedisModuleKey *key = RedisModule_OpenKey(ctx, keyname, REDISMODULE_READ|REDISMODULE_OPEN_KEY_NOTOUCH); + long long lru, lfu; + RedisModule_GetLRUOrLFU(key, &lfu, &lru); + RedisModule_ReplyWithLongLong(ctx, lru); + RedisModule_CloseKey(key); + return REDISMODULE_OK; +} + +int test_setlru(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) +{ + if (argc<3) { + RedisModule_WrongArity(ctx); + return REDISMODULE_OK; + } + RedisModuleString *keyname = argv[1]; + RedisModuleKey *key = RedisModule_OpenKey(ctx, keyname, REDISMODULE_WRITE|REDISMODULE_OPEN_KEY_NOTOUCH); + long long lru; + RedisModule_StringToLongLong(argv[2], &lru); + RedisModule_SetLRUOrLFU(key, -1, lru); + RedisModule_ReplyWithCString(ctx, "Ok"); + RedisModule_CloseKey(key); + return REDISMODULE_OK; +} + int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { REDISMODULE_NOT_USED(argv); REDISMODULE_NOT_USED(argc); @@ -50,6 +109,16 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) return REDISMODULE_ERR; if (RedisModule_CreateCommand(ctx,"test.call_info", test_call_info,"",0,0,0) == REDISMODULE_ERR) return REDISMODULE_ERR; + if (RedisModule_CreateCommand(ctx,"test.flushall", test_flushall,"",0,0,0) == REDISMODULE_ERR) + return REDISMODULE_ERR; + if (RedisModule_CreateCommand(ctx,"test.dbsize", test_dbsize,"",0,0,0) == REDISMODULE_ERR) + return REDISMODULE_ERR; + if (RedisModule_CreateCommand(ctx,"test.randomkey", test_randomkey,"",0,0,0) == REDISMODULE_ERR) + return REDISMODULE_ERR; + if (RedisModule_CreateCommand(ctx,"test.setlru", test_setlru,"",0,0,0) == REDISMODULE_ERR) + return REDISMODULE_ERR; + if (RedisModule_CreateCommand(ctx,"test.getlru", test_getlru,"",0,0,0) == REDISMODULE_ERR) + return REDISMODULE_ERR; return REDISMODULE_OK; } diff --git a/tests/unit/moduleapi/misc.tcl b/tests/unit/moduleapi/misc.tcl index d392aeab0..ebfa9631f 100644 --- a/tests/unit/moduleapi/misc.tcl +++ b/tests/unit/moduleapi/misc.tcl @@ -16,4 +16,23 @@ start_server {tags {"modules"}} { assert { [string match "*cmdstat_module*" $info] } } + test {test module db commands} { + r set x foo + set key [r test.randomkey] + assert_equal $key "x" + assert_equal [r test.dbsize] 1 + r test.flushall + assert_equal [r test.dbsize] 0 + } + + test {test modle lru api} { + r set x foo + set lru [r test.getlru x] + assert { $lru <= 1 } + r test.setlru x 100 + set idle [r object idletime x] + assert { $idle >= 100 } + set lru [r test.getlru x] + assert { $lru >= 100 } + } }