diff --git a/src/aof.c b/src/aof.c index 0e3648ff0..dda9579e3 100644 --- a/src/aof.c +++ b/src/aof.c @@ -731,7 +731,7 @@ int loadAppendOnlyFile(char *filename) { server.aof_state = AOF_OFF; fakeClient = createAOFClient(); - startLoadingFile(fp, filename); + startLoadingFile(fp, filename, RDBFLAGS_AOF_PREAMBLE); /* Check if this AOF file has an RDB preamble. In that case we need to * load the RDB file and later continue loading the AOF tail. */ @@ -746,7 +746,7 @@ int loadAppendOnlyFile(char *filename) { serverLog(LL_NOTICE,"Reading RDB preamble from AOF file..."); if (fseek(fp,0,SEEK_SET) == -1) goto readerr; rioInitWithFile(&rdb,fp); - if (rdbLoadRio(&rdb,NULL,1) != C_OK) { + if (rdbLoadRio(&rdb,RDBFLAGS_AOF_PREAMBLE,NULL) != C_OK) { serverLog(LL_WARNING,"Error reading the RDB preamble of the AOF file, AOF loading aborted"); goto readerr; } else { @@ -767,6 +767,7 @@ int loadAppendOnlyFile(char *filename) { if (!(loops++ % 1000)) { loadingProgress(ftello(fp)); processEventsWhileBlocked(); + processModuleLoadingProgressEvent(1); } if (fgets(buf,sizeof(buf),fp) == NULL) { @@ -859,7 +860,7 @@ loaded_ok: /* DB loaded, cleanup and return C_OK to the caller. */ fclose(fp); freeFakeClient(fakeClient); server.aof_state = old_aof_state; - stopLoading(); + stopLoading(1); aofUpdateCurrentSize(); server.aof_rewrite_base_size = server.aof_current_size; server.aof_fsync_offset = server.aof_current_size; @@ -1400,9 +1401,11 @@ int rewriteAppendOnlyFile(char *filename) { if (server.aof_rewrite_incremental_fsync) rioSetAutoSync(&aof,REDIS_AUTOSYNC_BYTES); + startSaving(RDBFLAGS_AOF_PREAMBLE); + if (server.aof_use_rdb_preamble) { int error; - if (rdbSaveRio(&aof,&error,RDB_SAVE_AOF_PREAMBLE,NULL) == C_ERR) { + if (rdbSaveRio(&aof,&error,RDBFLAGS_AOF_PREAMBLE,NULL) == C_ERR) { errno = error; goto werr; } @@ -1465,15 +1468,18 @@ int rewriteAppendOnlyFile(char *filename) { if (rename(tmpfile,filename) == -1) { serverLog(LL_WARNING,"Error moving temp append only file on the final destination: %s", strerror(errno)); unlink(tmpfile); + stopSaving(0); return C_ERR; } serverLog(LL_NOTICE,"SYNC append only file rewrite performed"); + stopSaving(1); return C_OK; werr: serverLog(LL_WARNING,"Write error writing append only file on disk: %s", strerror(errno)); fclose(fp); unlink(tmpfile); + stopSaving(0); return C_ERR; } diff --git a/src/debug.c b/src/debug.c index 179f6d2c9..a2d37337d 100644 --- a/src/debug.c +++ b/src/debug.c @@ -417,7 +417,7 @@ NULL } emptyDb(-1,EMPTYDB_NO_FLAGS,NULL); protectClient(c); - int ret = rdbLoad(server.rdb_filename,NULL); + int ret = rdbLoad(server.rdb_filename,NULL,RDBFLAGS_NONE); unprotectClient(c); if (ret != C_OK) { addReplyError(c,"Error trying to load the RDB dump"); diff --git a/src/module.c b/src/module.c index f9f654b42..d985eba16 100644 --- a/src/module.c +++ b/src/module.c @@ -1625,6 +1625,27 @@ int modulePopulateClientInfoStructure(void *ci, client *client, int structver) { return REDISMODULE_OK; } +/* This is an helper for moduleFireServerEvent() and other functions: + * It populates the replication info structure with the appropriate + * fields depending on the version provided. If the version is not valid + * then REDISMODULE_ERR is returned. Otherwise the function returns + * REDISMODULE_OK and the structure pointed by 'ri' gets populated. */ +int modulePopulateReplicationInfoStructure(void *ri, int structver) { + if (structver != 1) return REDISMODULE_ERR; + + RedisModuleReplicationInfoV1 *ri1 = ri; + memset(ri1,0,sizeof(*ri1)); + ri1->version = structver; + ri1->master = server.masterhost==NULL; + ri1->masterhost = server.masterhost? server.masterhost: ""; + ri1->masterport = server.masterport; + ri1->replid1 = server.replid; + ri1->replid2 = server.replid2; + ri1->repl1_offset = server.master_repl_offset; + ri1->repl2_offset = server.second_replid_offset; + return REDISMODULE_OK; +} + /* Return information about the client with the specified ID (that was * previously obtained via the RedisModule_GetClientId() API). If the * client exists, REDISMODULE_OK is returned, otherwise REDISMODULE_ERR @@ -5949,8 +5970,8 @@ void ModuleForkDoneHandler(int exitcode, int bysignal) { * * The following sub events are available: * - * REDISMODULE_EVENT_REPLROLECHANGED_NOW_MASTER - * REDISMODULE_EVENT_REPLROLECHANGED_NOW_REPLICA + * REDISMODULE_SUBEVENT_REPLROLECHANGED_NOW_MASTER + * REDISMODULE_SUBEVENT_REPLROLECHANGED_NOW_REPLICA * * The 'data' field can be casted by the callback to a * RedisModuleReplicationInfo structure with the following fields: @@ -5960,24 +5981,30 @@ void ModuleForkDoneHandler(int exitcode, int bysignal) { * int masterport; // master instance port for NOW_REPLICA * char *replid1; // Main replication ID * char *replid2; // Secondary replication ID + * uint64_t repl1_offset; // Main replication offset * uint64_t repl2_offset; // Offset of replid2 validity - * uint64_t main_repl_offset; // Replication offset * * RedisModuleEvent_Persistence * * This event is called when RDB saving or AOF rewriting starts * and ends. The following sub events are available: * - * REDISMODULE_EVENT_LOADING_RDB_START // BGSAVE start - * REDISMODULE_EVENT_LOADING_RDB_END // BGSAVE end - * REDISMODULE_EVENT_LOADING_SYNC_RDB_START // SAVE start - * REDISMODULE_EVENT_LOADING_SYNC_RDB_START // SAVE end - * REDISMODULE_EVENT_LOADING_AOF_START // AOF rewrite start - * REDISMODULE_EVENT_LOADING_AOF_END // AOF rewrite end + * REDISMODULE_SUBEVENT_PERSISTENCE_RDB_START + * REDISMODULE_SUBEVENT_PERSISTENCE_AOF_START + * REDISMODULE_SUBEVENT_PERSISTENCE_SYNC_RDB_START + * REDISMODULE_SUBEVENT_PERSISTENCE_ENDED + * REDISMODULE_SUBEVENT_PERSISTENCE_FAILED * * The above events are triggered not just when the user calls the * relevant commands like BGSAVE, but also when a saving operation * or AOF rewriting occurs because of internal server triggers. + * The SYNC_RDB_START sub events are happening in the forground due to + * SAVE command, FLUSHALL, or server shutdown, and the other RDB and + * AOF sub events are executed in a background fork child, so any + * action the module takes can only affect the generated AOF or RDB, + * but will not be reflected in the parent process and affect connected + * clients and commands. Also note that the AOF_START sub event may end + * up saving RDB content in case of an AOF with rdb-preamble. * * RedisModuleEvent_FlushDB * @@ -5985,8 +6012,8 @@ void ModuleForkDoneHandler(int exitcode, int bysignal) { * because of replication, after the replica synchronization) * happened. The following sub events are available: * - * REDISMODULE_EVENT_FLUSHDB_START - * REDISMODULE_EVENT_FLUSHDB_END + * REDISMODULE_SUBEVENT_FLUSHDB_START + * REDISMODULE_SUBEVENT_FLUSHDB_END * * The data pointer can be casted to a RedisModuleFlushInfo * structure with the following fields: @@ -6010,12 +6037,15 @@ void ModuleForkDoneHandler(int exitcode, int bysignal) { * replica is loading the RDB file from the master. * The following sub events are available: * - * REDISMODULE_EVENT_LOADING_RDB_START - * REDISMODULE_EVENT_LOADING_RDB_END - * REDISMODULE_EVENT_LOADING_MASTER_RDB_START - * REDISMODULE_EVENT_LOADING_MASTER_RDB_END - * REDISMODULE_EVENT_LOADING_AOF_START - * REDISMODULE_EVENT_LOADING_AOF_END + * REDISMODULE_SUBEVENT_LOADING_RDB_START + * REDISMODULE_SUBEVENT_LOADING_AOF_START + * REDISMODULE_SUBEVENT_LOADING_REPL_START + * REDISMODULE_SUBEVENT_LOADING_ENDED + * REDISMODULE_SUBEVENT_LOADING_FAILED + * + * Note that AOF loading may start with an RDB data in case of + * rdb-preamble, in which case you'll only recieve an AOF_START event. + * * * RedisModuleEvent_ClientChange * @@ -6024,8 +6054,8 @@ void ModuleForkDoneHandler(int exitcode, int bysignal) { * structure, documented in RedisModule_GetClientInfoById(). * The following sub events are available: * - * REDISMODULE_EVENT_CLIENT_CHANGE_CONNECTED - * REDISMODULE_EVENT_CLIENT_CHANGE_DISCONNECTED + * REDISMODULE_SUBEVENT_CLIENT_CHANGE_CONNECTED + * REDISMODULE_SUBEVENT_CLIENT_CHANGE_DISCONNECTED * * RedisModuleEvent_Shutdown * @@ -6038,8 +6068,8 @@ void ModuleForkDoneHandler(int exitcode, int bysignal) { * replica since it gets disconnected. * The following sub events are availble: * - * REDISMODULE_EVENT_REPLICA_CHANGE_ONLINE - * REDISMODULE_EVENT_REPLICA_CHANGE_OFFLINE + * REDISMODULE_SUBEVENT_REPLICA_CHANGE_ONLINE + * REDISMODULE_SUBEVENT_REPLICA_CHANGE_OFFLINE * * No additional information is available so far: future versions * of Redis will have an API in order to enumerate the replicas @@ -6054,6 +6084,11 @@ void ModuleForkDoneHandler(int exitcode, int bysignal) { * this changes depending on the "hz" configuration. * No sub events are available. * + * The data pointer can be casted to a RedisModuleCronLoop + * structure with the following fields: + * + * int32_t hz; // Approximate number of events per second. + * * RedisModuleEvent_MasterLinkChange * * This is called for replicas in order to notify when the @@ -6063,8 +6098,38 @@ void ModuleForkDoneHandler(int exitcode, int bysignal) { * replication is happening correctly. * The following sub events are available: * - * REDISMODULE_EVENT_MASTER_LINK_UP - * REDISMODULE_EVENT_MASTER_LINK_DOWN + * REDISMODULE_SUBEVENT_MASTER_LINK_UP + * REDISMODULE_SUBEVENT_MASTER_LINK_DOWN + * + * RedisModuleEvent_ModuleChange + * + * This event is called when a new module is loaded or one is unloaded. + * The following sub events are availble: + * + * REDISMODULE_SUBEVENT_MODULE_LOADED + * REDISMODULE_SUBEVENT_MODULE_UNLOADED + * + * The data pointer can be casted to a RedisModuleModuleChange + * structure with the following fields: + * + * const char* module_name; // Name of module loaded or unloaded. + * int32_t module_version; // Module version. + * + * RedisModuleEvent_LoadingProgress + * + * This event is called repeatedly called while an RDB or AOF file + * is being loaded. + * The following sub events are availble: + * + * REDISMODULE_SUBEVENT_LOADING_PROGRESS_RDB + * REDISMODULE_SUBEVENT_LOADING_PROGRESS_AOF + * + * The data pointer can be casted to a RedisModuleLoadingProgress + * structure with the following fields: + * + * int32_t hz; // Approximate number of events per second. + * int32_t progress; // Approximate progress between 0 and 1024, + * or -1 if unknown. * * The function returns REDISMODULE_OK if the module was successfully subscrived * for the specified event. If the API is called from a wrong context then @@ -6123,7 +6188,7 @@ void moduleFireServerEvent(uint64_t eid, int subid, void *data) { listRewind(RedisModule_EventListeners,&li); while((ln = listNext(&li))) { RedisModuleEventListener *el = ln->value; - if (el->event.id == eid && !el->module->in_hook) { + if (el->event.id == eid) { RedisModuleCtx ctx = REDISMODULE_CTX_INIT; ctx.module = el->module; @@ -6137,6 +6202,8 @@ void moduleFireServerEvent(uint64_t eid, int subid, void *data) { void *moduledata = NULL; RedisModuleClientInfoV1 civ1; + RedisModuleReplicationInfoV1 riv1; + RedisModuleModuleChangeV1 mcv1; /* Start at DB zero by default when calling the handler. It's * up to the specific event setup to change it when it makes * sense. For instance for FLUSHDB events we select the correct @@ -6148,11 +6215,26 @@ void moduleFireServerEvent(uint64_t eid, int subid, void *data) { modulePopulateClientInfoStructure(&civ1,data, el->event.dataver); moduledata = &civ1; + } else if (eid == REDISMODULE_EVENT_REPLICATION_ROLE_CHANGED) { + modulePopulateReplicationInfoStructure(&riv1,el->event.dataver); + moduledata = &riv1; } else if (eid == REDISMODULE_EVENT_FLUSHDB) { moduledata = data; RedisModuleFlushInfoV1 *fi = data; if (fi->dbnum != -1) selectDb(ctx.client, fi->dbnum); + } else if (eid == REDISMODULE_EVENT_MODULE_CHANGE) { + RedisModule *m = data; + if (m == el->module) + continue; + mcv1.version = REDISMODULE_MODULE_CHANGE_VERSION; + mcv1.module_name = m->name; + mcv1.module_version = m->ver; + moduledata = &mcv1; + } else if (eid == REDISMODULE_EVENT_LOADING_PROGRESS) { + moduledata = data; + } else if (eid == REDISMODULE_EVENT_CRON_LOOP) { + moduledata = data; } ModulesInHooks++; @@ -6184,6 +6266,27 @@ void moduleUnsubscribeAllServerEvents(RedisModule *module) { } } +void processModuleLoadingProgressEvent(int is_aof) { + long long now = ustime(); + static long long next_event = 0; + if (now >= next_event) { + /* Fire the loading progress modules end event. */ + int progress = -1; + if (server.loading_total_bytes) + progress = (server.loading_total_bytes<<10) / server.loading_total_bytes; + RedisModuleFlushInfoV1 fi = {REDISMODULE_LOADING_PROGRESS_VERSION, + server.hz, + progress}; + moduleFireServerEvent(REDISMODULE_EVENT_LOADING_PROGRESS, + is_aof? + REDISMODULE_SUBEVENT_LOADING_PROGRESS_AOF: + REDISMODULE_SUBEVENT_LOADING_PROGRESS_RDB, + &fi); + /* decide when the next event should fire. */ + next_event = now + 1000000 / server.hz; + } +} + /* -------------------------------------------------------------------------- * Modules API internals * -------------------------------------------------------------------------- */ @@ -6352,6 +6455,11 @@ int moduleLoad(const char *path, void **module_argv, int module_argc) { ctx.module->blocked_clients = 0; ctx.module->handle = handle; serverLog(LL_NOTICE,"Module '%s' loaded from %s",ctx.module->name,path); + /* Fire the loaded modules event. */ + moduleFireServerEvent(REDISMODULE_EVENT_MODULE_CHANGE, + REDISMODULE_SUBEVENT_MODULE_LOADED, + ctx.module); + moduleFreeContext(&ctx); return C_OK; } @@ -6414,6 +6522,11 @@ int moduleUnload(sds name) { module->name, error); } + /* Fire the unloaded modules event. */ + moduleFireServerEvent(REDISMODULE_EVENT_MODULE_CHANGE, + REDISMODULE_SUBEVENT_MODULE_UNLOADED, + module); + /* Remove from list of modules. */ serverLog(LL_NOTICE,"Module %s unloaded",module->name); dictDelete(modules,module->name); diff --git a/src/networking.c b/src/networking.c index e7cc561fa..9336c177c 100644 --- a/src/networking.c +++ b/src/networking.c @@ -1118,6 +1118,11 @@ void freeClient(client *c) { if (c->flags & CLIENT_SLAVE && listLength(server.slaves) == 0) server.repl_no_slaves_since = server.unixtime; refreshGoodSlavesCount(); + /* Fire the replica change modules event. */ + if (c->replstate == SLAVE_STATE_ONLINE) + moduleFireServerEvent(REDISMODULE_EVENT_REPLICA_CHANGE, + REDISMODULE_SUBEVENT_REPLICA_CHANGE_OFFLINE, + NULL); } /* Master/slave cleanup Case 2: diff --git a/src/rdb.c b/src/rdb.c index f530219a4..b569edfea 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -1080,9 +1080,9 @@ ssize_t rdbSaveAuxFieldStrInt(rio *rdb, char *key, long long val) { } /* Save a few default AUX fields with information about the RDB generated. */ -int rdbSaveInfoAuxFields(rio *rdb, int flags, rdbSaveInfo *rsi) { +int rdbSaveInfoAuxFields(rio *rdb, int rdbflags, rdbSaveInfo *rsi) { int redis_bits = (sizeof(void*) == 8) ? 64 : 32; - int aof_preamble = (flags & RDB_SAVE_AOF_PREAMBLE) != 0; + int aof_preamble = (rdbflags & RDBFLAGS_AOF_PREAMBLE) != 0; /* Add a few fields about the state when the RDB was created. */ if (rdbSaveAuxFieldStrStr(rdb,"redis-ver",REDIS_VERSION) == -1) return -1; @@ -1150,7 +1150,7 @@ ssize_t rdbSaveSingleModuleAux(rio *rdb, int when, moduleType *mt) { * When the function returns C_ERR and if 'error' is not NULL, the * integer pointed by 'error' is set to the value of errno just after the I/O * error. */ -int rdbSaveRio(rio *rdb, int *error, int flags, rdbSaveInfo *rsi) { +int rdbSaveRio(rio *rdb, int *error, int rdbflags, rdbSaveInfo *rsi) { dictIterator *di = NULL; dictEntry *de; char magic[10]; @@ -1162,7 +1162,7 @@ int rdbSaveRio(rio *rdb, int *error, int flags, rdbSaveInfo *rsi) { rdb->update_cksum = rioGenericUpdateChecksum; snprintf(magic,sizeof(magic),"REDIS%04d",RDB_VERSION); if (rdbWriteRaw(rdb,magic,9) == -1) goto werr; - if (rdbSaveInfoAuxFields(rdb,flags,rsi) == -1) goto werr; + if (rdbSaveInfoAuxFields(rdb,rdbflags,rsi) == -1) goto werr; if (rdbSaveModulesAux(rdb, REDISMODULE_AUX_BEFORE_RDB) == -1) goto werr; for (j = 0; j < server.dbnum; j++) { @@ -1199,7 +1199,7 @@ int rdbSaveRio(rio *rdb, int *error, int flags, rdbSaveInfo *rsi) { /* When this RDB is produced as part of an AOF rewrite, move * accumulated diff from parent to child while rewriting in * order to have a smaller final write. */ - if (flags & RDB_SAVE_AOF_PREAMBLE && + if (rdbflags & RDBFLAGS_AOF_PREAMBLE && rdb->processed_bytes > processed+AOF_READ_DIFF_INTERVAL_BYTES) { processed = rdb->processed_bytes; @@ -1254,18 +1254,21 @@ werr: int rdbSaveRioWithEOFMark(rio *rdb, int *error, rdbSaveInfo *rsi) { char eofmark[RDB_EOF_MARK_SIZE]; + startSaving(RDBFLAGS_REPLICATION); getRandomHexChars(eofmark,RDB_EOF_MARK_SIZE); if (error) *error = 0; if (rioWrite(rdb,"$EOF:",5) == 0) goto werr; if (rioWrite(rdb,eofmark,RDB_EOF_MARK_SIZE) == 0) goto werr; if (rioWrite(rdb,"\r\n",2) == 0) goto werr; - if (rdbSaveRio(rdb,error,RDB_SAVE_NONE,rsi) == C_ERR) goto werr; + if (rdbSaveRio(rdb,error,RDBFLAGS_NONE,rsi) == C_ERR) goto werr; if (rioWrite(rdb,eofmark,RDB_EOF_MARK_SIZE) == 0) goto werr; + stopSaving(1); return C_OK; werr: /* Write error. */ /* Set 'error' only if not already set by rdbSaveRio() call. */ if (error && *error == 0) *error = errno; + stopSaving(0); return C_ERR; } @@ -1291,11 +1294,12 @@ int rdbSave(char *filename, rdbSaveInfo *rsi) { } rioInitWithFile(&rdb,fp); + startSaving(RDBFLAGS_NONE); if (server.rdb_save_incremental_fsync) rioSetAutoSync(&rdb,REDIS_AUTOSYNC_BYTES); - if (rdbSaveRio(&rdb,&error,RDB_SAVE_NONE,rsi) == C_ERR) { + if (rdbSaveRio(&rdb,&error,RDBFLAGS_NONE,rsi) == C_ERR) { errno = error; goto werr; } @@ -1317,6 +1321,7 @@ int rdbSave(char *filename, rdbSaveInfo *rsi) { cwdp ? cwdp : "unknown", strerror(errno)); unlink(tmpfile); + stopSaving(0); return C_ERR; } @@ -1324,12 +1329,14 @@ int rdbSave(char *filename, rdbSaveInfo *rsi) { server.dirty = 0; server.lastsave = time(NULL); server.lastbgsave_status = C_OK; + stopSaving(1); return C_OK; werr: serverLog(LL_WARNING,"Write error saving DB on disk: %s", strerror(errno)); fclose(fp); unlink(tmpfile); + stopSaving(0); return C_ERR; } @@ -1918,23 +1925,33 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key) { /* Mark that we are loading in the global state and setup the fields * needed to provide loading stats. */ -void startLoading(size_t size) { +void startLoading(size_t size, int rdbflags) { /* Load the DB */ server.loading = 1; server.loading_start_time = time(NULL); server.loading_loaded_bytes = 0; server.loading_total_bytes = size; + + /* Fire the loading modules start event. */ + int subevent; + if (rdbflags & RDBFLAGS_AOF_PREAMBLE) + subevent = REDISMODULE_SUBEVENT_LOADING_AOF_START; + else if(rdbflags & RDBFLAGS_REPLICATION) + subevent = REDISMODULE_SUBEVENT_LOADING_REPL_START; + else + subevent = REDISMODULE_SUBEVENT_LOADING_RDB_START; + moduleFireServerEvent(REDISMODULE_EVENT_LOADING,subevent,NULL); } /* Mark that we are loading in the global state and setup the fields * needed to provide loading stats. * 'filename' is optional and used for rdb-check on error */ -void startLoadingFile(FILE *fp, char* filename) { +void startLoadingFile(FILE *fp, char* filename, int rdbflags) { struct stat sb; if (fstat(fileno(fp), &sb) == -1) sb.st_size = 0; rdbFileBeingLoaded = filename; - startLoading(sb.st_size); + startLoading(sb.st_size, rdbflags); } /* Refresh the loading progress info */ @@ -1945,9 +1962,37 @@ void loadingProgress(off_t pos) { } /* Loading finished */ -void stopLoading(void) { +void stopLoading(int success) { server.loading = 0; rdbFileBeingLoaded = NULL; + + /* Fire the loading modules end event. */ + moduleFireServerEvent(REDISMODULE_EVENT_LOADING, + success? + REDISMODULE_SUBEVENT_LOADING_ENDED: + REDISMODULE_SUBEVENT_LOADING_FAILED, + NULL); +} + +void startSaving(int rdbflags) { + /* Fire the persistence modules end event. */ + int subevent; + if (rdbflags & RDBFLAGS_AOF_PREAMBLE) + subevent = REDISMODULE_SUBEVENT_PERSISTENCE_AOF_START; + else if (getpid()!=server.pid) + subevent = REDISMODULE_SUBEVENT_PERSISTENCE_RDB_START; + else + subevent = REDISMODULE_SUBEVENT_PERSISTENCE_SYNC_RDB_START; + moduleFireServerEvent(REDISMODULE_EVENT_PERSISTENCE,subevent,NULL); +} + +void stopSaving(int success) { + /* Fire the persistence modules end event. */ + moduleFireServerEvent(REDISMODULE_EVENT_PERSISTENCE, + success? + REDISMODULE_SUBEVENT_PERSISTENCE_ENDED: + REDISMODULE_SUBEVENT_PERSISTENCE_FAILED, + NULL); } /* Track loading progress in order to serve client's from time to time @@ -1966,12 +2011,13 @@ void rdbLoadProgressCallback(rio *r, const void *buf, size_t len) { replicationSendNewlineToMaster(); loadingProgress(r->processed_bytes); processEventsWhileBlocked(); + processModuleLoadingProgressEvent(0); } } /* Load an RDB file from the rio stream 'rdb'. On success C_OK is returned, * otherwise C_ERR is returned and 'errno' is set accordingly. */ -int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) { +int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) { uint64_t dbid; int type, rdbver; redisDb *db = server.db+0; @@ -2182,7 +2228,7 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) { * received from the master. In the latter case, the master is * responsible for key expiry. If we would expire keys here, the * snapshot taken by the master may not be reflected on the slave. */ - if (server.masterhost == NULL && !loading_aof && expiretime != -1 && expiretime < now) { + if (server.masterhost == NULL && !(rdbflags&RDBFLAGS_AOF_PREAMBLE) && expiretime != -1 && expiretime < now) { decrRefCount(key); decrRefCount(val); } else { @@ -2243,17 +2289,17 @@ eoferr: * * If you pass an 'rsi' structure initialied with RDB_SAVE_OPTION_INIT, the * loading code will fiil the information fields in the structure. */ -int rdbLoad(char *filename, rdbSaveInfo *rsi) { +int rdbLoad(char *filename, rdbSaveInfo *rsi, int rdbflags) { FILE *fp; rio rdb; int retval; if ((fp = fopen(filename,"r")) == NULL) return C_ERR; - startLoadingFile(fp, filename); + startLoadingFile(fp, filename,rdbflags); rioInitWithFile(&rdb,fp); - retval = rdbLoadRio(&rdb,rsi,0); + retval = rdbLoadRio(&rdb,rdbflags,rsi); fclose(fp); - stopLoading(); + stopLoading(retval==C_OK); return retval; } diff --git a/src/rdb.h b/src/rdb.h index 40a50f7ba..4229beea8 100644 --- a/src/rdb.h +++ b/src/rdb.h @@ -121,8 +121,10 @@ #define RDB_LOAD_PLAIN (1<<1) #define RDB_LOAD_SDS (1<<2) -#define RDB_SAVE_NONE 0 -#define RDB_SAVE_AOF_PREAMBLE (1<<0) +/* flags on the purpose of rdb save or load */ +#define RDBFLAGS_NONE 0 +#define RDBFLAGS_AOF_PREAMBLE (1<<0) +#define RDBFLAGS_REPLICATION (1<<1) int rdbSaveType(rio *rdb, unsigned char type); int rdbLoadType(rio *rdb); @@ -135,7 +137,7 @@ uint64_t rdbLoadLen(rio *rdb, int *isencoded); int rdbLoadLenByRef(rio *rdb, int *isencoded, uint64_t *lenptr); int rdbSaveObjectType(rio *rdb, robj *o); int rdbLoadObjectType(rio *rdb); -int rdbLoad(char *filename, rdbSaveInfo *rsi); +int rdbLoad(char *filename, rdbSaveInfo *rsi, int rdbflags); int rdbSaveBackground(char *filename, rdbSaveInfo *rsi); int rdbSaveToSlavesSockets(rdbSaveInfo *rsi); void rdbRemoveTempFile(pid_t childpid); @@ -154,7 +156,8 @@ int rdbSaveBinaryDoubleValue(rio *rdb, double val); int rdbLoadBinaryDoubleValue(rio *rdb, double *val); int rdbSaveBinaryFloatValue(rio *rdb, float val); int rdbLoadBinaryFloatValue(rio *rdb, float *val); -int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof); +int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi); +int rdbSaveRio(rio *rdb, int *error, int rdbflags, rdbSaveInfo *rsi); rdbSaveInfo *rdbPopulateSaveInfo(rdbSaveInfo *rsi); #endif diff --git a/src/redis-check-rdb.c b/src/redis-check-rdb.c index 5e7415046..1210d49b4 100644 --- a/src/redis-check-rdb.c +++ b/src/redis-check-rdb.c @@ -202,7 +202,7 @@ int redis_check_rdb(char *rdbfilename, FILE *fp) { } expiretime = -1; - startLoadingFile(fp, rdbfilename); + startLoadingFile(fp, rdbfilename, RDBFLAGS_NONE); while(1) { robj *key, *val; @@ -316,7 +316,7 @@ int redis_check_rdb(char *rdbfilename, FILE *fp) { } if (closefile) fclose(fp); - stopLoading(); + stopLoading(1); return 0; eoferr: /* unexpected end of file is handled here with a fatal exit */ @@ -327,7 +327,7 @@ eoferr: /* unexpected end of file is handled here with a fatal exit */ } err: if (closefile) fclose(fp); - stopLoading(); + stopLoading(0); return 1; } diff --git a/src/redismodule.h b/src/redismodule.h index ea0d6a139..0049f394e 100644 --- a/src/redismodule.h +++ b/src/redismodule.h @@ -181,6 +181,8 @@ typedef uint64_t RedisModuleTimerID; #define REDISMODULE_EVENT_REPLICA_CHANGE 6 #define REDISMODULE_EVENT_MASTER_LINK_CHANGE 7 #define REDISMODULE_EVENT_CRON_LOOP 8 +#define REDISMODULE_EVENT_MODULE_CHANGE 9 +#define REDISMODULE_EVENT_LOADING_PROGRESS 10 typedef struct RedisModuleEvent { uint64_t id; /* REDISMODULE_EVENT_... defines. */ @@ -226,18 +228,28 @@ static const RedisModuleEvent RedisModuleEvent_MasterLinkChange = { REDISMODULE_EVENT_MASTER_LINK_CHANGE, 1 + }, + RedisModuleEvent_ModuleChange = { + REDISMODULE_EVENT_MODULE_CHANGE, + 1 + }, + RedisModuleEvent_LoadingProgress = { + REDISMODULE_EVENT_LOADING_PROGRESS, + 1 }; /* Those are values that are used for the 'subevent' callback argument. */ #define REDISMODULE_SUBEVENT_PERSISTENCE_RDB_START 0 -#define REDISMODULE_SUBEVENT_PERSISTENCE_RDB_END 1 -#define REDISMODULE_SUBEVENT_PERSISTENCE_AOF_START 2 -#define REDISMODULE_SUBEVENT_PERSISTENCE_AOF_END 3 +#define REDISMODULE_SUBEVENT_PERSISTENCE_AOF_START 1 +#define REDISMODULE_SUBEVENT_PERSISTENCE_SYNC_RDB_START 2 +#define REDISMODULE_SUBEVENT_PERSISTENCE_ENDED 3 +#define REDISMODULE_SUBEVENT_PERSISTENCE_FAILED 4 #define REDISMODULE_SUBEVENT_LOADING_RDB_START 0 -#define REDISMODULE_SUBEVENT_LOADING_RDB_END 1 -#define REDISMODULE_SUBEVENT_LOADING_AOF_START 2 -#define REDISMODULE_SUBEVENT_LOADING_AOF_END 3 +#define REDISMODULE_SUBEVENT_LOADING_AOF_START 1 +#define REDISMODULE_SUBEVENT_LOADING_REPL_START 2 +#define REDISMODULE_SUBEVENT_LOADING_ENDED 3 +#define REDISMODULE_SUBEVENT_LOADING_FAILED 4 #define REDISMODULE_SUBEVENT_CLIENT_CHANGE_CONNECTED 0 #define REDISMODULE_SUBEVENT_CLIENT_CHANGE_DISCONNECTED 1 @@ -245,12 +257,21 @@ static const RedisModuleEvent #define REDISMODULE_SUBEVENT_MASTER_LINK_UP 0 #define REDISMODULE_SUBEVENT_MASTER_LINK_DOWN 1 -#define REDISMODULE_SUBEVENT_REPLICA_CHANGE_CONNECTED 0 -#define REDISMODULE_SUBEVENT_REPLICA_CHANGE_DISCONNECTED 1 +#define REDISMODULE_SUBEVENT_REPLICA_CHANGE_ONLINE 0 +#define REDISMODULE_SUBEVENT_REPLICA_CHANGE_OFFLINE 1 + +#define REDISMODULE_EVENT_REPLROLECHANGED_NOW_MASTER 0 +#define REDISMODULE_EVENT_REPLROLECHANGED_NOW_REPLICA 1 #define REDISMODULE_SUBEVENT_FLUSHDB_START 0 #define REDISMODULE_SUBEVENT_FLUSHDB_END 1 +#define REDISMODULE_SUBEVENT_MODULE_LOADED 0 +#define REDISMODULE_SUBEVENT_MODULE_UNLOADED 1 + +#define REDISMODULE_SUBEVENT_LOADING_PROGRESS_RDB 0 +#define REDISMODULE_SUBEVENT_LOADING_PROGRESS_AOF 1 + /* RedisModuleClientInfo flags. */ #define REDISMODULE_CLIENTINFO_FLAG_SSL (1<<0) #define REDISMODULE_CLIENTINFO_FLAG_PUBSUB (1<<1) @@ -286,6 +307,22 @@ typedef struct RedisModuleClientInfo { #define RedisModuleClientInfo RedisModuleClientInfoV1 +#define REDISMODULE_REPLICATIONINFO_VERSION 1 +typedef struct RedisModuleReplicationInfo { + uint64_t version; /* Not used since this structure is never passed + from the module to the core right now. Here + for future compatibility. */ + int master; /* true if master, false if replica */ + char *masterhost; /* master instance hostname for NOW_REPLICA */ + int masterport; /* master instance port for NOW_REPLICA */ + char *replid1; /* Main replication ID */ + char *replid2; /* Secondary replication ID */ + uint64_t repl1_offset; /* Main replication offset */ + uint64_t repl2_offset; /* Offset of replid2 validity */ +} RedisModuleReplicationInfoV1; + +#define RedisModuleReplicationInfo RedisModuleReplicationInfoV1 + #define REDISMODULE_FLUSHINFO_VERSION 1 typedef struct RedisModuleFlushInfo { uint64_t version; /* Not used since this structure is never passed @@ -297,6 +334,39 @@ typedef struct RedisModuleFlushInfo { #define RedisModuleFlushInfo RedisModuleFlushInfoV1 +#define REDISMODULE_MODULE_CHANGE_VERSION 1 +typedef struct RedisModuleModuleChange { + uint64_t version; /* Not used since this structure is never passed + from the module to the core right now. Here + for future compatibility. */ + const char* module_name;/* Name of module loaded or unloaded. */ + int32_t module_version; /* Module version. */ +} RedisModuleModuleChangeV1; + +#define RedisModuleModuleChange RedisModuleModuleChangeV1 + +#define REDISMODULE_CRON_LOOP_VERSION 1 +typedef struct RedisModuleCronLoopInfo { + uint64_t version; /* Not used since this structure is never passed + from the module to the core right now. Here + for future compatibility. */ + int32_t hz; /* Approximate number of events per second. */ +} RedisModuleCronLoopV1; + +#define RedisModuleCronLoop RedisModuleCronLoopV1 + +#define REDISMODULE_LOADING_PROGRESS_VERSION 1 +typedef struct RedisModuleLoadingProgressInfo { + uint64_t version; /* Not used since this structure is never passed + from the module to the core right now. Here + for future compatibility. */ + int32_t hz; /* Approximate number of events per second. */ + int32_t progress; /* Approximate progress between 0 and 1024, or -1 + * if unknown. */ +} RedisModuleLoadingProgressV1; + +#define RedisModuleLoadingProgress RedisModuleLoadingProgressV1 + /* ------------------------- End of common defines ------------------------ */ #ifndef REDISMODULE_CORE diff --git a/src/replication.c b/src/replication.c index 4550e6a83..c9a2e0fe1 100644 --- a/src/replication.c +++ b/src/replication.c @@ -533,6 +533,12 @@ int masterTryPartialResynchronization(client *c) { * has this state from the previous connection with the master. */ refreshGoodSlavesCount(); + + /* Fire the replica change modules event. */ + moduleFireServerEvent(REDISMODULE_EVENT_REPLICA_CHANGE, + REDISMODULE_SUBEVENT_REPLICA_CHANGE_ONLINE, + NULL); + return C_OK; /* The caller can return, no full resync needed. */ need_full_resync: @@ -868,6 +874,10 @@ void putSlaveOnline(client *slave) { return; } refreshGoodSlavesCount(); + /* Fire the replica change modules event. */ + moduleFireServerEvent(REDISMODULE_EVENT_REPLICA_CHANGE, + REDISMODULE_SUBEVENT_REPLICA_CHANGE_ONLINE, + NULL); serverLog(LL_NOTICE,"Synchronization with replica %s succeeded", replicationGetSlaveName(slave)); } @@ -1542,11 +1552,11 @@ void readSyncBulkPayload(connection *conn) { * We'll restore it when the RDB is received. */ connBlock(conn); connRecvTimeout(conn, server.repl_timeout*1000); - startLoading(server.repl_transfer_size); + startLoading(server.repl_transfer_size, RDBFLAGS_REPLICATION); - if (rdbLoadRio(&rdb,&rsi,0) != C_OK) { + if (rdbLoadRio(&rdb,RDBFLAGS_REPLICATION,&rsi) != C_OK) { /* RDB loading failed. */ - stopLoading(); + stopLoading(0); serverLog(LL_WARNING, "Failed trying to load the MASTER synchronization DB " "from socket"); @@ -1567,7 +1577,7 @@ void readSyncBulkPayload(connection *conn) { * gets promoted. */ return; } - stopLoading(); + stopLoading(1); /* RDB loading succeeded if we reach this point. */ if (server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB) { @@ -1614,7 +1624,7 @@ void readSyncBulkPayload(connection *conn) { cancelReplicationHandshake(); return; } - if (rdbLoad(server.rdb_filename,&rsi) != C_OK) { + if (rdbLoad(server.rdb_filename,&rsi,RDBFLAGS_REPLICATION) != C_OK) { serverLog(LL_WARNING, "Failed trying to load the MASTER synchronization " "DB from disk"); @@ -1636,6 +1646,11 @@ void readSyncBulkPayload(connection *conn) { server.repl_state = REPL_STATE_CONNECTED; server.repl_down_since = 0; + /* Fire the master link modules event. */ + moduleFireServerEvent(REDISMODULE_EVENT_MASTER_LINK_CHANGE, + REDISMODULE_SUBEVENT_MASTER_LINK_UP, + NULL); + /* After a full resynchroniziation we use the replication ID and * offset of the master. The secondary ID / offset are cleared since * we are starting a new history. */ @@ -2314,12 +2329,31 @@ void replicationSetMaster(char *ip, int port) { replicationDiscardCachedMaster(); replicationCacheMasterUsingMyself(); } + + /* Fire the role change modules event. */ + moduleFireServerEvent(REDISMODULE_EVENT_REPLICATION_ROLE_CHANGED, + REDISMODULE_EVENT_REPLROLECHANGED_NOW_REPLICA, + NULL); + + /* Fire the master link modules event. */ + if (server.repl_state == REPL_STATE_CONNECTED) + moduleFireServerEvent(REDISMODULE_EVENT_MASTER_LINK_CHANGE, + REDISMODULE_SUBEVENT_MASTER_LINK_DOWN, + NULL); + server.repl_state = REPL_STATE_CONNECT; } /* Cancel replication, setting the instance as a master itself. */ void replicationUnsetMaster(void) { if (server.masterhost == NULL) return; /* Nothing to do. */ + + /* Fire the master link modules event. */ + if (server.repl_state == REPL_STATE_CONNECTED) + moduleFireServerEvent(REDISMODULE_EVENT_MASTER_LINK_CHANGE, + REDISMODULE_SUBEVENT_MASTER_LINK_DOWN, + NULL); + sdsfree(server.masterhost); server.masterhost = NULL; /* When a slave is turned into a master, the current replication ID @@ -2348,11 +2382,22 @@ void replicationUnsetMaster(void) { * starting from now. Otherwise the backlog will be freed after a * failover if slaves do not connect immediately. */ server.repl_no_slaves_since = server.unixtime; + + /* Fire the role change modules event. */ + moduleFireServerEvent(REDISMODULE_EVENT_REPLICATION_ROLE_CHANGED, + REDISMODULE_EVENT_REPLROLECHANGED_NOW_MASTER, + NULL); } /* This function is called when the slave lose the connection with the * master into an unexpected way. */ void replicationHandleMasterDisconnection(void) { + /* Fire the master link modules event. */ + if (server.repl_state == REPL_STATE_CONNECTED) + moduleFireServerEvent(REDISMODULE_EVENT_MASTER_LINK_CHANGE, + REDISMODULE_SUBEVENT_MASTER_LINK_DOWN, + NULL); + server.master = NULL; server.repl_state = REPL_STATE_CONNECT; server.repl_down_since = server.unixtime; diff --git a/src/server.c b/src/server.c index 8f165113d..f42924764 100644 --- a/src/server.c +++ b/src/server.c @@ -2056,6 +2056,12 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { server.rdb_bgsave_scheduled = 0; } + /* Fire the cron loop modules event. */ + RedisModuleCronLoopV1 ei = {REDISMODULE_CRON_LOOP_VERSION,server.hz}; + moduleFireServerEvent(REDISMODULE_EVENT_CRON_LOOP, + 0, + &ei); + server.cronloops++; return 1000/server.hz; } @@ -3682,6 +3688,9 @@ int prepareForShutdown(int flags) { } } + /* Fire the shutdown modules event. */ + moduleFireServerEvent(REDISMODULE_EVENT_SHUTDOWN,0,NULL); + /* Remove the pid file if possible and needed. */ if (server.daemonize || server.pidfile) { serverLog(LL_NOTICE,"Removing the pid file."); @@ -4767,7 +4776,7 @@ void loadDataFromDisk(void) { serverLog(LL_NOTICE,"DB loaded from append only file: %.3f seconds",(float)(ustime()-start)/1000000); } else { rdbSaveInfo rsi = RDB_SAVE_INFO_INIT; - if (rdbLoad(server.rdb_filename,&rsi) == C_OK) { + if (rdbLoad(server.rdb_filename,&rsi,RDBFLAGS_NONE) == C_OK) { serverLog(LL_NOTICE,"DB loaded from disk: %.3f seconds", (float)(ustime()-start)/1000000); diff --git a/src/server.h b/src/server.h index f724f7d64..fba24b195 100644 --- a/src/server.h +++ b/src/server.h @@ -1602,6 +1602,7 @@ ssize_t rdbSaveModulesAux(rio *rdb, int when); int moduleAllDatatypesHandleErrors(); sds modulesCollectInfo(sds info, sds section, int for_crash_report, int sections); void moduleFireServerEvent(uint64_t eid, int subid, void *data); +void processModuleLoadingProgressEvent(int is_aof); int moduleTryServeClientBlockedOnKey(client *c, robj *key); void moduleUnblockClient(client *c); int moduleClientIsBlockedOnKeys(client *c); @@ -1834,10 +1835,12 @@ void rdbPipeReadHandler(struct aeEventLoop *eventLoop, int fd, void *clientData, void rdbPipeWriteHandlerConnRemoved(struct connection *conn); /* Generic persistence functions */ -void startLoadingFile(FILE* fp, char* filename); -void startLoading(size_t size); +void startLoadingFile(FILE* fp, char* filename, int rdbflags); +void startLoading(size_t size, int rdbflags); void loadingProgress(off_t pos); -void stopLoading(void); +void stopLoading(int success); +void startSaving(int rdbflags); +void stopSaving(int success); #define DISK_ERROR_TYPE_AOF 1 /* Don't accept writes: AOF errors. */ #define DISK_ERROR_TYPE_RDB 2 /* Don't accept writes: RDB errors. */ @@ -1846,7 +1849,6 @@ int writeCommandsDeniedByDiskError(void); /* RDB persistence */ #include "rdb.h" -int rdbSaveRio(rio *rdb, int *error, int flags, rdbSaveInfo *rsi); void killRDBChild(void); /* AOF persistence */ diff --git a/tests/modules/hooks.c b/tests/modules/hooks.c index 33b690b2f..665a20481 100644 --- a/tests/modules/hooks.c +++ b/tests/modules/hooks.c @@ -30,36 +30,227 @@ * POSSIBILITY OF SUCH DAMAGE. */ -#define REDISMODULE_EXPERIMENTAL_API #include "redismodule.h" +#include +#include + +/* We need to store events to be able to test and see what we got, and we can't + * store them in the key-space since that would mess up rdb loading (duplicates) + * and be lost of flushdb. */ +RedisModuleDict *event_log = NULL; + +typedef struct EventElement { + long count; + RedisModuleString *last_val_string; + long last_val_int; +} EventElement; + +void LogStringEvent(RedisModuleCtx *ctx, const char* keyname, const char* data) { + EventElement *event = RedisModule_DictGetC(event_log, (void*)keyname, strlen(keyname), NULL); + if (!event) { + event = RedisModule_Alloc(sizeof(EventElement)); + memset(event, 0, sizeof(EventElement)); + RedisModule_DictSetC(event_log, (void*)keyname, strlen(keyname), event); + } + if (event->last_val_string) RedisModule_FreeString(ctx, event->last_val_string); + event->last_val_string = RedisModule_CreateString(ctx, data, strlen(data)); + event->count++; +} + +void LogNumericEvent(RedisModuleCtx *ctx, const char* keyname, long data) { + REDISMODULE_NOT_USED(ctx); + EventElement *event = RedisModule_DictGetC(event_log, (void*)keyname, strlen(keyname), NULL); + if (!event) { + event = RedisModule_Alloc(sizeof(EventElement)); + memset(event, 0, sizeof(EventElement)); + RedisModule_DictSetC(event_log, (void*)keyname, strlen(keyname), event); + } + event->last_val_int = data; + event->count++; +} + +void FreeEvent(RedisModuleCtx *ctx, EventElement *event) { + if (event->last_val_string) + RedisModule_FreeString(ctx, event->last_val_string); + RedisModule_Free(event); +} + +int cmdEventCount(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) +{ + if (argc != 2){ + RedisModule_WrongArity(ctx); + return REDISMODULE_OK; + } + + EventElement *event = RedisModule_DictGet(event_log, argv[1], NULL); + RedisModule_ReplyWithLongLong(ctx, event? event->count: 0); + return REDISMODULE_OK; +} + +int cmdEventLast(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) +{ + if (argc != 2){ + RedisModule_WrongArity(ctx); + return REDISMODULE_OK; + } + + EventElement *event = RedisModule_DictGet(event_log, argv[1], NULL); + if (event && event->last_val_string) + RedisModule_ReplyWithString(ctx, event->last_val_string); + else if (event) + RedisModule_ReplyWithLongLong(ctx, event->last_val_int); + else + RedisModule_ReplyWithNull(ctx); + return REDISMODULE_OK; +} + +void clearEvents(RedisModuleCtx *ctx) +{ + RedisModuleString *key; + EventElement *event; + RedisModuleDictIter *iter = RedisModule_DictIteratorStart(event_log, "^", NULL); + while((key = RedisModule_DictNext(ctx, iter, (void**)&event)) != NULL) { + event->count = 0; + event->last_val_int = 0; + if (event->last_val_string) RedisModule_FreeString(ctx, event->last_val_string); + event->last_val_string = NULL; + RedisModule_DictDel(event_log, key, NULL); + RedisModule_Free(event); + } + RedisModule_DictIteratorStop(iter); +} + +int cmdEventsClear(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) +{ + REDISMODULE_NOT_USED(argc); + REDISMODULE_NOT_USED(argv); + clearEvents(ctx); + return REDISMODULE_OK; +} /* Client state change callback. */ void clientChangeCallback(RedisModuleCtx *ctx, RedisModuleEvent e, uint64_t sub, void *data) { - REDISMODULE_NOT_USED(ctx); REDISMODULE_NOT_USED(e); RedisModuleClientInfo *ci = data; char *keyname = (sub == REDISMODULE_SUBEVENT_CLIENT_CHANGE_CONNECTED) ? - "connected" : "disconnected"; - RedisModuleCallReply *reply; - RedisModule_SelectDb(ctx,9); - reply = RedisModule_Call(ctx,"RPUSH","cl",keyname,(long)ci->id); - RedisModule_FreeCallReply(reply); + "client-connected" : "client-disconnected"; + LogNumericEvent(ctx, keyname, ci->id); } void flushdbCallback(RedisModuleCtx *ctx, RedisModuleEvent e, uint64_t sub, void *data) { - REDISMODULE_NOT_USED(ctx); REDISMODULE_NOT_USED(e); RedisModuleFlushInfo *fi = data; char *keyname = (sub == REDISMODULE_SUBEVENT_FLUSHDB_START) ? "flush-start" : "flush-end"; - RedisModuleCallReply *reply; - RedisModule_SelectDb(ctx,9); - reply = RedisModule_Call(ctx,"RPUSH","cl",keyname,(long)fi->dbnum); - RedisModule_FreeCallReply(reply); + LogNumericEvent(ctx, keyname, fi->dbnum); +} + +void roleChangeCallback(RedisModuleCtx *ctx, RedisModuleEvent e, uint64_t sub, void *data) +{ + REDISMODULE_NOT_USED(e); + REDISMODULE_NOT_USED(data); + + RedisModuleReplicationInfo *ri = data; + char *keyname = (sub == REDISMODULE_EVENT_REPLROLECHANGED_NOW_MASTER) ? + "role-master" : "role-replica"; + LogStringEvent(ctx, keyname, ri->masterhost); +} + +void replicationChangeCallback(RedisModuleCtx *ctx, RedisModuleEvent e, uint64_t sub, void *data) +{ + REDISMODULE_NOT_USED(e); + REDISMODULE_NOT_USED(data); + + char *keyname = (sub == REDISMODULE_SUBEVENT_REPLICA_CHANGE_ONLINE) ? + "replica-online" : "replica-offline"; + LogNumericEvent(ctx, keyname, 0); +} + +void rasterLinkChangeCallback(RedisModuleCtx *ctx, RedisModuleEvent e, uint64_t sub, void *data) +{ + REDISMODULE_NOT_USED(e); + REDISMODULE_NOT_USED(data); + + char *keyname = (sub == REDISMODULE_SUBEVENT_MASTER_LINK_UP) ? + "masterlink-up" : "masterlink-down"; + LogNumericEvent(ctx, keyname, 0); +} + +void persistenceCallback(RedisModuleCtx *ctx, RedisModuleEvent e, uint64_t sub, void *data) +{ + REDISMODULE_NOT_USED(e); + REDISMODULE_NOT_USED(data); + + char *keyname = NULL; + switch (sub) { + case REDISMODULE_SUBEVENT_PERSISTENCE_RDB_START: keyname = "persistence-rdb-start"; break; + case REDISMODULE_SUBEVENT_PERSISTENCE_AOF_START: keyname = "persistence-aof-start"; break; + case REDISMODULE_SUBEVENT_PERSISTENCE_SYNC_RDB_START: keyname = "persistence-syncrdb-start"; break; + case REDISMODULE_SUBEVENT_PERSISTENCE_ENDED: keyname = "persistence-end"; break; + case REDISMODULE_SUBEVENT_PERSISTENCE_FAILED: keyname = "persistence-failed"; break; + } + /* modifying the keyspace from the fork child is not an option, using log instead */ + RedisModule_Log(ctx, "warning", "module-event-%s", keyname); + if (sub == REDISMODULE_SUBEVENT_PERSISTENCE_SYNC_RDB_START) + LogNumericEvent(ctx, keyname, 0); +} + +void loadingCallback(RedisModuleCtx *ctx, RedisModuleEvent e, uint64_t sub, void *data) +{ + REDISMODULE_NOT_USED(e); + REDISMODULE_NOT_USED(data); + + char *keyname = NULL; + switch (sub) { + case REDISMODULE_SUBEVENT_LOADING_RDB_START: keyname = "loading-rdb-start"; break; + case REDISMODULE_SUBEVENT_LOADING_AOF_START: keyname = "loading-aof-start"; break; + case REDISMODULE_SUBEVENT_LOADING_REPL_START: keyname = "loading-repl-start"; break; + case REDISMODULE_SUBEVENT_LOADING_ENDED: keyname = "loading-end"; break; + case REDISMODULE_SUBEVENT_LOADING_FAILED: keyname = "loading-failed"; break; + } + LogNumericEvent(ctx, keyname, 0); +} + +void loadingProgressCallback(RedisModuleCtx *ctx, RedisModuleEvent e, uint64_t sub, void *data) +{ + REDISMODULE_NOT_USED(e); + + RedisModuleLoadingProgress *ei = data; + char *keyname = (sub == REDISMODULE_SUBEVENT_LOADING_PROGRESS_RDB) ? + "loading-progress-rdb" : "loading-progress-aof"; + LogNumericEvent(ctx, keyname, ei->progress); +} + +void shutdownCallback(RedisModuleCtx *ctx, RedisModuleEvent e, uint64_t sub, void *data) +{ + REDISMODULE_NOT_USED(e); + REDISMODULE_NOT_USED(data); + REDISMODULE_NOT_USED(sub); + + RedisModule_Log(ctx, "warning", "module-event-%s", "shutdown"); +} + +void cronLoopCallback(RedisModuleCtx *ctx, RedisModuleEvent e, uint64_t sub, void *data) +{ + REDISMODULE_NOT_USED(e); + REDISMODULE_NOT_USED(sub); + + RedisModuleCronLoop *ei = data; + LogNumericEvent(ctx, "cron-loop", ei->hz); +} + +void moduleChangeCallback(RedisModuleCtx *ctx, RedisModuleEvent e, uint64_t sub, void *data) +{ + REDISMODULE_NOT_USED(e); + + RedisModuleModuleChange *ei = data; + char *keyname = (sub == REDISMODULE_SUBEVENT_MODULE_LOADED) ? + "module-loaded" : "module-unloaded"; + LogStringEvent(ctx, keyname, ei->module_name); } /* This function must be present on each Redis module. It is used in order to @@ -71,9 +262,50 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) if (RedisModule_Init(ctx,"testhook",1,REDISMODULE_APIVER_1) == REDISMODULE_ERR) return REDISMODULE_ERR; + /* replication related hooks */ + RedisModule_SubscribeToServerEvent(ctx, + RedisModuleEvent_ReplicationRoleChanged, roleChangeCallback); + RedisModule_SubscribeToServerEvent(ctx, + RedisModuleEvent_ReplicaChange, replicationChangeCallback); + RedisModule_SubscribeToServerEvent(ctx, + RedisModuleEvent_MasterLinkChange, rasterLinkChangeCallback); + + /* persistence related hooks */ + RedisModule_SubscribeToServerEvent(ctx, + RedisModuleEvent_Persistence, persistenceCallback); + RedisModule_SubscribeToServerEvent(ctx, + RedisModuleEvent_Loading, loadingCallback); + RedisModule_SubscribeToServerEvent(ctx, + RedisModuleEvent_LoadingProgress, loadingProgressCallback); + + /* other hooks */ RedisModule_SubscribeToServerEvent(ctx, RedisModuleEvent_ClientChange, clientChangeCallback); RedisModule_SubscribeToServerEvent(ctx, RedisModuleEvent_FlushDB, flushdbCallback); + RedisModule_SubscribeToServerEvent(ctx, + RedisModuleEvent_Shutdown, shutdownCallback); + RedisModule_SubscribeToServerEvent(ctx, + RedisModuleEvent_CronLoop, cronLoopCallback); + RedisModule_SubscribeToServerEvent(ctx, + RedisModuleEvent_ModuleChange, moduleChangeCallback); + + event_log = RedisModule_CreateDict(ctx); + + if (RedisModule_CreateCommand(ctx,"hooks.event_count", cmdEventCount,"",0,0,0) == REDISMODULE_ERR) + return REDISMODULE_ERR; + if (RedisModule_CreateCommand(ctx,"hooks.event_last", cmdEventLast,"",0,0,0) == REDISMODULE_ERR) + return REDISMODULE_ERR; + if (RedisModule_CreateCommand(ctx,"hooks.clear", cmdEventsClear,"",0,0,0) == REDISMODULE_ERR) + return REDISMODULE_ERR; + return REDISMODULE_OK; } + +int RedisModule_OnUnload(RedisModuleCtx *ctx) { + clearEvents(ctx); + RedisModule_FreeDict(ctx, event_log); + event_log = NULL; + return REDISMODULE_OK; +} + diff --git a/tests/unit/moduleapi/hooks.tcl b/tests/unit/moduleapi/hooks.tcl index 7a727902d..cbca9c3eb 100644 --- a/tests/unit/moduleapi/hooks.tcl +++ b/tests/unit/moduleapi/hooks.tcl @@ -3,26 +3,138 @@ set testmodule [file normalize tests/modules/hooks.so] tags "modules" { start_server {} { r module load $testmodule + r config set appendonly yes + test {Test clients connection / disconnection hooks} { for {set j 0} {$j < 2} {incr j} { set rd1 [redis_deferring_client] $rd1 close } - assert {[r llen connected] > 1} - assert {[r llen disconnected] > 1} + assert {[r hooks.event_count client-connected] > 1} + assert {[r hooks.event_count client-disconnected] > 1} + } + + test {Test module cron hook} { + after 100 + assert {[r hooks.event_count cron-loop] > 0} + set hz [r hooks.event_last cron-loop] + assert_equal $hz 10 + } + + test {Test module loaded / unloaded hooks} { + set othermodule [file normalize tests/modules/infotest.so] + r module load $othermodule + r module unload infotest + assert_equal [r hooks.event_last module-loaded] "infotest" + assert_equal [r hooks.event_last module-unloaded] "infotest" + } + + test {Test module aofrw hook} { + r debug populate 1000 foo 10000 ;# 10mb worth of data + r config set rdbcompression no ;# rdb progress is only checked once in 2mb + r BGREWRITEAOF + waitForBgrewriteaof r + assert_equal [string match {*module-event-persistence-aof-start*} [exec tail -20 < [srv 0 stdout]]] 1 + assert_equal [string match {*module-event-persistence-end*} [exec tail -20 < [srv 0 stdout]]] 1 + } + + test {Test module aof load and rdb/aof progress hooks} { + # create some aof tail (progress is checked only once in 1000 commands) + for {set j 0} {$j < 4000} {incr j} { + r set "bar$j" x + } + # set some configs that will cause many loading progress events during aof loading + r config set key-load-delay 1 + r config set dynamic-hz no + r config set hz 500 + r DEBUG LOADAOF + assert_equal [r hooks.event_last loading-aof-start] 0 + assert_equal [r hooks.event_last loading-end] 0 + assert {[r hooks.event_count loading-rdb-start] == 0} + assert {[r hooks.event_count loading-progress-rdb] >= 2} ;# comes from the preamble section + assert {[r hooks.event_count loading-progress-aof] >= 2} + } + # undo configs before next test + r config set dynamic-hz yes + r config set key-load-delay 0 + + test {Test module rdb save hook} { + # debug reload does: save, flush, load: + assert {[r hooks.event_count persistence-syncrdb-start] == 0} + assert {[r hooks.event_count loading-rdb-start] == 0} + r debug reload + assert {[r hooks.event_count persistence-syncrdb-start] == 1} + assert {[r hooks.event_count loading-rdb-start] == 1} } test {Test flushdb hooks} { - r flushall ;# Note: only the "end" RPUSH will survive - r select 1 r flushdb - r select 2 - r flushdb - r select 9 - assert {[r llen flush-start] == 2} - assert {[r llen flush-end] == 3} - assert {[r lrange flush-start 0 -1] eq {1 2}} - assert {[r lrange flush-end 0 -1] eq {-1 1 2}} + assert_equal [r hooks.event_last flush-start] 9 + assert_equal [r hooks.event_last flush-end] 9 + r flushall + assert_equal [r hooks.event_last flush-start] -1 + assert_equal [r hooks.event_last flush-end] -1 } + + # replication related tests + set master [srv 0 client] + set master_host [srv 0 host] + set master_port [srv 0 port] + start_server {} { + r module load $testmodule + set replica [srv 0 client] + set replica_host [srv 0 host] + set replica_port [srv 0 port] + $replica replicaof $master_host $master_port + + wait_for_condition 50 100 { + [string match {*master_link_status:up*} [r info replication]] + } else { + fail "Can't turn the instance into a replica" + } + + test {Test master link up hook} { + assert_equal [r hooks.event_count masterlink-up] 1 + assert_equal [r hooks.event_count masterlink-down] 0 + } + + test {Test role-replica hook} { + assert_equal [r hooks.event_count role-replica] 1 + assert_equal [r hooks.event_count role-master] 0 + assert_equal [r hooks.event_last role-replica] [s 0 master_host] + } + + test {Test replica-online hook} { + assert_equal [r -1 hooks.event_count replica-online] 1 + assert_equal [r -1 hooks.event_count replica-offline] 0 + } + + test {Test master link down hook} { + r client kill type master + assert_equal [r hooks.event_count masterlink-down] 1 + } + + $replica replicaof no one + + test {Test role-master hook} { + assert_equal [r hooks.event_count role-replica] 1 + assert_equal [r hooks.event_count role-master] 1 + assert_equal [r hooks.event_last role-master] {} + } + + test {Test replica-offline hook} { + assert_equal [r -1 hooks.event_count replica-online] 1 + assert_equal [r -1 hooks.event_count replica-offline] 1 + } + # get the replica stdout, to be used by the next test + set replica_stdout [srv 0 stdout] + } + + + # look into the log file of the server that just exited + test {Test shutdown hook} { + assert_equal [string match {*module-event-shutdown*} [exec tail -5 < $replica_stdout]] 1 + } + } }