Modules hooks: complete missing hooks for the initial set of hooks

* replication hooks: role change, master link status, replica online/offline
* persistence hooks: saving, loading, loading progress
* misc hooks: cron loop, shutdown, module loaded/unloaded
* change the way hooks test work, and add tests for all of the above

startLoading() now gets flag indicating what is loaded.
stopLoading() now gets an indication of success or failure.
adding startSaving() and stopSaving() with similar args and role.
This commit is contained in:
Oran Agra 2019-10-29 17:59:09 +02:00
parent 6f4ef70666
commit 68c6aacf3b
13 changed files with 737 additions and 94 deletions

View File

@ -731,7 +731,7 @@ int loadAppendOnlyFile(char *filename) {
server.aof_state = AOF_OFF; server.aof_state = AOF_OFF;
fakeClient = createAOFClient(); fakeClient = createAOFClient();
startLoadingFile(fp, filename); startLoadingFile(fp, filename, RDBFLAGS_AOF_PREAMBLE);
/* Check if this AOF file has an RDB preamble. In that case we need to /* Check if this AOF file has an RDB preamble. In that case we need to
* load the RDB file and later continue loading the AOF tail. */ * load the RDB file and later continue loading the AOF tail. */
@ -746,7 +746,7 @@ int loadAppendOnlyFile(char *filename) {
serverLog(LL_NOTICE,"Reading RDB preamble from AOF file..."); serverLog(LL_NOTICE,"Reading RDB preamble from AOF file...");
if (fseek(fp,0,SEEK_SET) == -1) goto readerr; if (fseek(fp,0,SEEK_SET) == -1) goto readerr;
rioInitWithFile(&rdb,fp); rioInitWithFile(&rdb,fp);
if (rdbLoadRio(&rdb,NULL,1) != C_OK) { if (rdbLoadRio(&rdb,RDBFLAGS_AOF_PREAMBLE,NULL) != C_OK) {
serverLog(LL_WARNING,"Error reading the RDB preamble of the AOF file, AOF loading aborted"); serverLog(LL_WARNING,"Error reading the RDB preamble of the AOF file, AOF loading aborted");
goto readerr; goto readerr;
} else { } else {
@ -767,6 +767,7 @@ int loadAppendOnlyFile(char *filename) {
if (!(loops++ % 1000)) { if (!(loops++ % 1000)) {
loadingProgress(ftello(fp)); loadingProgress(ftello(fp));
processEventsWhileBlocked(); processEventsWhileBlocked();
processModuleLoadingProgressEvent(1);
} }
if (fgets(buf,sizeof(buf),fp) == NULL) { if (fgets(buf,sizeof(buf),fp) == NULL) {
@ -859,7 +860,7 @@ loaded_ok: /* DB loaded, cleanup and return C_OK to the caller. */
fclose(fp); fclose(fp);
freeFakeClient(fakeClient); freeFakeClient(fakeClient);
server.aof_state = old_aof_state; server.aof_state = old_aof_state;
stopLoading(); stopLoading(1);
aofUpdateCurrentSize(); aofUpdateCurrentSize();
server.aof_rewrite_base_size = server.aof_current_size; server.aof_rewrite_base_size = server.aof_current_size;
server.aof_fsync_offset = server.aof_current_size; server.aof_fsync_offset = server.aof_current_size;
@ -1400,9 +1401,11 @@ int rewriteAppendOnlyFile(char *filename) {
if (server.aof_rewrite_incremental_fsync) if (server.aof_rewrite_incremental_fsync)
rioSetAutoSync(&aof,REDIS_AUTOSYNC_BYTES); rioSetAutoSync(&aof,REDIS_AUTOSYNC_BYTES);
startSaving(RDBFLAGS_AOF_PREAMBLE);
if (server.aof_use_rdb_preamble) { if (server.aof_use_rdb_preamble) {
int error; int error;
if (rdbSaveRio(&aof,&error,RDB_SAVE_AOF_PREAMBLE,NULL) == C_ERR) { if (rdbSaveRio(&aof,&error,RDBFLAGS_AOF_PREAMBLE,NULL) == C_ERR) {
errno = error; errno = error;
goto werr; goto werr;
} }
@ -1465,15 +1468,18 @@ int rewriteAppendOnlyFile(char *filename) {
if (rename(tmpfile,filename) == -1) { if (rename(tmpfile,filename) == -1) {
serverLog(LL_WARNING,"Error moving temp append only file on the final destination: %s", strerror(errno)); serverLog(LL_WARNING,"Error moving temp append only file on the final destination: %s", strerror(errno));
unlink(tmpfile); unlink(tmpfile);
stopSaving(0);
return C_ERR; return C_ERR;
} }
serverLog(LL_NOTICE,"SYNC append only file rewrite performed"); serverLog(LL_NOTICE,"SYNC append only file rewrite performed");
stopSaving(1);
return C_OK; return C_OK;
werr: werr:
serverLog(LL_WARNING,"Write error writing append only file on disk: %s", strerror(errno)); serverLog(LL_WARNING,"Write error writing append only file on disk: %s", strerror(errno));
fclose(fp); fclose(fp);
unlink(tmpfile); unlink(tmpfile);
stopSaving(0);
return C_ERR; return C_ERR;
} }

View File

@ -417,7 +417,7 @@ NULL
} }
emptyDb(-1,EMPTYDB_NO_FLAGS,NULL); emptyDb(-1,EMPTYDB_NO_FLAGS,NULL);
protectClient(c); protectClient(c);
int ret = rdbLoad(server.rdb_filename,NULL); int ret = rdbLoad(server.rdb_filename,NULL,RDBFLAGS_NONE);
unprotectClient(c); unprotectClient(c);
if (ret != C_OK) { if (ret != C_OK) {
addReplyError(c,"Error trying to load the RDB dump"); addReplyError(c,"Error trying to load the RDB dump");

View File

@ -1620,6 +1620,27 @@ int modulePopulateClientInfoStructure(void *ci, client *client, int structver) {
return REDISMODULE_OK; return REDISMODULE_OK;
} }
/* This is an helper for moduleFireServerEvent() and other functions:
* It populates the replication info structure with the appropriate
* fields depending on the version provided. If the version is not valid
* then REDISMODULE_ERR is returned. Otherwise the function returns
* REDISMODULE_OK and the structure pointed by 'ri' gets populated. */
int modulePopulateReplicationInfoStructure(void *ri, int structver) {
if (structver != 1) return REDISMODULE_ERR;
RedisModuleReplicationInfoV1 *ri1 = ri;
memset(ri1,0,sizeof(*ri1));
ri1->version = structver;
ri1->master = server.masterhost==NULL;
ri1->masterhost = server.masterhost? server.masterhost: "";
ri1->masterport = server.masterport;
ri1->replid1 = server.replid;
ri1->replid2 = server.replid2;
ri1->repl1_offset = server.master_repl_offset;
ri1->repl2_offset = server.second_replid_offset;
return REDISMODULE_OK;
}
/* Return information about the client with the specified ID (that was /* Return information about the client with the specified ID (that was
* previously obtained via the RedisModule_GetClientId() API). If the * previously obtained via the RedisModule_GetClientId() API). If the
* client exists, REDISMODULE_OK is returned, otherwise REDISMODULE_ERR * client exists, REDISMODULE_OK is returned, otherwise REDISMODULE_ERR
@ -5780,8 +5801,8 @@ void ModuleForkDoneHandler(int exitcode, int bysignal) {
* *
* The following sub events are available: * The following sub events are available:
* *
* REDISMODULE_EVENT_REPLROLECHANGED_NOW_MASTER * REDISMODULE_SUBEVENT_REPLROLECHANGED_NOW_MASTER
* REDISMODULE_EVENT_REPLROLECHANGED_NOW_REPLICA * REDISMODULE_SUBEVENT_REPLROLECHANGED_NOW_REPLICA
* *
* The 'data' field can be casted by the callback to a * The 'data' field can be casted by the callback to a
* RedisModuleReplicationInfo structure with the following fields: * RedisModuleReplicationInfo structure with the following fields:
@ -5791,24 +5812,30 @@ void ModuleForkDoneHandler(int exitcode, int bysignal) {
* int masterport; // master instance port for NOW_REPLICA * int masterport; // master instance port for NOW_REPLICA
* char *replid1; // Main replication ID * char *replid1; // Main replication ID
* char *replid2; // Secondary replication ID * char *replid2; // Secondary replication ID
* uint64_t repl1_offset; // Main replication offset
* uint64_t repl2_offset; // Offset of replid2 validity * uint64_t repl2_offset; // Offset of replid2 validity
* uint64_t main_repl_offset; // Replication offset
* *
* RedisModuleEvent_Persistence * RedisModuleEvent_Persistence
* *
* This event is called when RDB saving or AOF rewriting starts * This event is called when RDB saving or AOF rewriting starts
* and ends. The following sub events are available: * and ends. The following sub events are available:
* *
* REDISMODULE_EVENT_LOADING_RDB_START // BGSAVE start * REDISMODULE_SUBEVENT_PERSISTENCE_RDB_START
* REDISMODULE_EVENT_LOADING_RDB_END // BGSAVE end * REDISMODULE_SUBEVENT_PERSISTENCE_AOF_START
* REDISMODULE_EVENT_LOADING_SYNC_RDB_START // SAVE start * REDISMODULE_SUBEVENT_PERSISTENCE_SYNC_RDB_START
* REDISMODULE_EVENT_LOADING_SYNC_RDB_START // SAVE end * REDISMODULE_SUBEVENT_PERSISTENCE_ENDED
* REDISMODULE_EVENT_LOADING_AOF_START // AOF rewrite start * REDISMODULE_SUBEVENT_PERSISTENCE_FAILED
* REDISMODULE_EVENT_LOADING_AOF_END // AOF rewrite end
* *
* The above events are triggered not just when the user calls the * The above events are triggered not just when the user calls the
* relevant commands like BGSAVE, but also when a saving operation * relevant commands like BGSAVE, but also when a saving operation
* or AOF rewriting occurs because of internal server triggers. * or AOF rewriting occurs because of internal server triggers.
* The SYNC_RDB_START sub events are happening in the forground due to
* SAVE command, FLUSHALL, or server shutdown, and the other RDB and
* AOF sub events are executed in a background fork child, so any
* action the module takes can only affect the generated AOF or RDB,
* but will not be reflected in the parent process and affect connected
* clients and commands. Also note that the AOF_START sub event may end
* up saving RDB content in case of an AOF with rdb-preamble.
* *
* RedisModuleEvent_FlushDB * RedisModuleEvent_FlushDB
* *
@ -5816,8 +5843,8 @@ void ModuleForkDoneHandler(int exitcode, int bysignal) {
* because of replication, after the replica synchronization) * because of replication, after the replica synchronization)
* happened. The following sub events are available: * happened. The following sub events are available:
* *
* REDISMODULE_EVENT_FLUSHDB_START * REDISMODULE_SUBEVENT_FLUSHDB_START
* REDISMODULE_EVENT_FLUSHDB_END * REDISMODULE_SUBEVENT_FLUSHDB_END
* *
* The data pointer can be casted to a RedisModuleFlushInfo * The data pointer can be casted to a RedisModuleFlushInfo
* structure with the following fields: * structure with the following fields:
@ -5841,12 +5868,15 @@ void ModuleForkDoneHandler(int exitcode, int bysignal) {
* replica is loading the RDB file from the master. * replica is loading the RDB file from the master.
* The following sub events are available: * The following sub events are available:
* *
* REDISMODULE_EVENT_LOADING_RDB_START * REDISMODULE_SUBEVENT_LOADING_RDB_START
* REDISMODULE_EVENT_LOADING_RDB_END * REDISMODULE_SUBEVENT_LOADING_AOF_START
* REDISMODULE_EVENT_LOADING_MASTER_RDB_START * REDISMODULE_SUBEVENT_LOADING_REPL_START
* REDISMODULE_EVENT_LOADING_MASTER_RDB_END * REDISMODULE_SUBEVENT_LOADING_ENDED
* REDISMODULE_EVENT_LOADING_AOF_START * REDISMODULE_SUBEVENT_LOADING_FAILED
* REDISMODULE_EVENT_LOADING_AOF_END *
* Note that AOF loading may start with an RDB data in case of
* rdb-preamble, in which case you'll only recieve an AOF_START event.
*
* *
* RedisModuleEvent_ClientChange * RedisModuleEvent_ClientChange
* *
@ -5855,8 +5885,8 @@ void ModuleForkDoneHandler(int exitcode, int bysignal) {
* structure, documented in RedisModule_GetClientInfoById(). * structure, documented in RedisModule_GetClientInfoById().
* The following sub events are available: * The following sub events are available:
* *
* REDISMODULE_EVENT_CLIENT_CHANGE_CONNECTED * REDISMODULE_SUBEVENT_CLIENT_CHANGE_CONNECTED
* REDISMODULE_EVENT_CLIENT_CHANGE_DISCONNECTED * REDISMODULE_SUBEVENT_CLIENT_CHANGE_DISCONNECTED
* *
* RedisModuleEvent_Shutdown * RedisModuleEvent_Shutdown
* *
@ -5869,8 +5899,8 @@ void ModuleForkDoneHandler(int exitcode, int bysignal) {
* replica since it gets disconnected. * replica since it gets disconnected.
* The following sub events are availble: * The following sub events are availble:
* *
* REDISMODULE_EVENT_REPLICA_CHANGE_ONLINE * REDISMODULE_SUBEVENT_REPLICA_CHANGE_ONLINE
* REDISMODULE_EVENT_REPLICA_CHANGE_OFFLINE * REDISMODULE_SUBEVENT_REPLICA_CHANGE_OFFLINE
* *
* No additional information is available so far: future versions * No additional information is available so far: future versions
* of Redis will have an API in order to enumerate the replicas * of Redis will have an API in order to enumerate the replicas
@ -5885,6 +5915,11 @@ void ModuleForkDoneHandler(int exitcode, int bysignal) {
* this changes depending on the "hz" configuration. * this changes depending on the "hz" configuration.
* No sub events are available. * No sub events are available.
* *
* The data pointer can be casted to a RedisModuleCronLoop
* structure with the following fields:
*
* int32_t hz; // Approximate number of events per second.
*
* RedisModuleEvent_MasterLinkChange * RedisModuleEvent_MasterLinkChange
* *
* This is called for replicas in order to notify when the * This is called for replicas in order to notify when the
@ -5894,8 +5929,38 @@ void ModuleForkDoneHandler(int exitcode, int bysignal) {
* replication is happening correctly. * replication is happening correctly.
* The following sub events are available: * The following sub events are available:
* *
* REDISMODULE_EVENT_MASTER_LINK_UP * REDISMODULE_SUBEVENT_MASTER_LINK_UP
* REDISMODULE_EVENT_MASTER_LINK_DOWN * REDISMODULE_SUBEVENT_MASTER_LINK_DOWN
*
* RedisModuleEvent_ModuleChange
*
* This event is called when a new module is loaded or one is unloaded.
* The following sub events are availble:
*
* REDISMODULE_SUBEVENT_MODULE_LOADED
* REDISMODULE_SUBEVENT_MODULE_UNLOADED
*
* The data pointer can be casted to a RedisModuleModuleChange
* structure with the following fields:
*
* const char* module_name; // Name of module loaded or unloaded.
* int32_t module_version; // Module version.
*
* RedisModuleEvent_LoadingProgress
*
* This event is called repeatedly called while an RDB or AOF file
* is being loaded.
* The following sub events are availble:
*
* REDISMODULE_SUBEVENT_LOADING_PROGRESS_RDB
* REDISMODULE_SUBEVENT_LOADING_PROGRESS_AOF
*
* The data pointer can be casted to a RedisModuleLoadingProgress
* structure with the following fields:
*
* int32_t hz; // Approximate number of events per second.
* int32_t progress; // Approximate progress between 0 and 1024,
* or -1 if unknown.
* *
* The function returns REDISMODULE_OK if the module was successfully subscrived * The function returns REDISMODULE_OK if the module was successfully subscrived
* for the specified event. If the API is called from a wrong context then * for the specified event. If the API is called from a wrong context then
@ -5954,7 +6019,7 @@ void moduleFireServerEvent(uint64_t eid, int subid, void *data) {
listRewind(RedisModule_EventListeners,&li); listRewind(RedisModule_EventListeners,&li);
while((ln = listNext(&li))) { while((ln = listNext(&li))) {
RedisModuleEventListener *el = ln->value; RedisModuleEventListener *el = ln->value;
if (el->event.id == eid && !el->module->in_hook) { if (el->event.id == eid) {
RedisModuleCtx ctx = REDISMODULE_CTX_INIT; RedisModuleCtx ctx = REDISMODULE_CTX_INIT;
ctx.module = el->module; ctx.module = el->module;
@ -5968,6 +6033,8 @@ void moduleFireServerEvent(uint64_t eid, int subid, void *data) {
void *moduledata = NULL; void *moduledata = NULL;
RedisModuleClientInfoV1 civ1; RedisModuleClientInfoV1 civ1;
RedisModuleReplicationInfoV1 riv1;
RedisModuleModuleChangeV1 mcv1;
/* Start at DB zero by default when calling the handler. It's /* Start at DB zero by default when calling the handler. It's
* up to the specific event setup to change it when it makes * up to the specific event setup to change it when it makes
* sense. For instance for FLUSHDB events we select the correct * sense. For instance for FLUSHDB events we select the correct
@ -5979,11 +6046,26 @@ void moduleFireServerEvent(uint64_t eid, int subid, void *data) {
modulePopulateClientInfoStructure(&civ1,data, modulePopulateClientInfoStructure(&civ1,data,
el->event.dataver); el->event.dataver);
moduledata = &civ1; moduledata = &civ1;
} else if (eid == REDISMODULE_EVENT_REPLICATION_ROLE_CHANGED) {
modulePopulateReplicationInfoStructure(&riv1,el->event.dataver);
moduledata = &riv1;
} else if (eid == REDISMODULE_EVENT_FLUSHDB) { } else if (eid == REDISMODULE_EVENT_FLUSHDB) {
moduledata = data; moduledata = data;
RedisModuleFlushInfoV1 *fi = data; RedisModuleFlushInfoV1 *fi = data;
if (fi->dbnum != -1) if (fi->dbnum != -1)
selectDb(ctx.client, fi->dbnum); selectDb(ctx.client, fi->dbnum);
} else if (eid == REDISMODULE_EVENT_MODULE_CHANGE) {
RedisModule *m = data;
if (m == el->module)
continue;
mcv1.version = REDISMODULE_MODULE_CHANGE_VERSION;
mcv1.module_name = m->name;
mcv1.module_version = m->ver;
moduledata = &mcv1;
} else if (eid == REDISMODULE_EVENT_LOADING_PROGRESS) {
moduledata = data;
} else if (eid == REDISMODULE_EVENT_CRON_LOOP) {
moduledata = data;
} }
ModulesInHooks++; ModulesInHooks++;
@ -6015,6 +6097,27 @@ void moduleUnsubscribeAllServerEvents(RedisModule *module) {
} }
} }
void processModuleLoadingProgressEvent(int is_aof) {
long long now = ustime();
static long long next_event = 0;
if (now >= next_event) {
/* Fire the loading progress modules end event. */
int progress = -1;
if (server.loading_total_bytes)
progress = (server.loading_total_bytes<<10) / server.loading_total_bytes;
RedisModuleFlushInfoV1 fi = {REDISMODULE_LOADING_PROGRESS_VERSION,
server.hz,
progress};
moduleFireServerEvent(REDISMODULE_EVENT_LOADING_PROGRESS,
is_aof?
REDISMODULE_SUBEVENT_LOADING_PROGRESS_AOF:
REDISMODULE_SUBEVENT_LOADING_PROGRESS_RDB,
&fi);
/* decide when the next event should fire. */
next_event = now + 1000000 / server.hz;
}
}
/* -------------------------------------------------------------------------- /* --------------------------------------------------------------------------
* Modules API internals * Modules API internals
* -------------------------------------------------------------------------- */ * -------------------------------------------------------------------------- */
@ -6183,6 +6286,11 @@ int moduleLoad(const char *path, void **module_argv, int module_argc) {
ctx.module->blocked_clients = 0; ctx.module->blocked_clients = 0;
ctx.module->handle = handle; ctx.module->handle = handle;
serverLog(LL_NOTICE,"Module '%s' loaded from %s",ctx.module->name,path); serverLog(LL_NOTICE,"Module '%s' loaded from %s",ctx.module->name,path);
/* Fire the loaded modules event. */
moduleFireServerEvent(REDISMODULE_EVENT_MODULE_CHANGE,
REDISMODULE_SUBEVENT_MODULE_LOADED,
ctx.module);
moduleFreeContext(&ctx); moduleFreeContext(&ctx);
return C_OK; return C_OK;
} }
@ -6245,6 +6353,11 @@ int moduleUnload(sds name) {
module->name, error); module->name, error);
} }
/* Fire the unloaded modules event. */
moduleFireServerEvent(REDISMODULE_EVENT_MODULE_CHANGE,
REDISMODULE_SUBEVENT_MODULE_UNLOADED,
module);
/* Remove from list of modules. */ /* Remove from list of modules. */
serverLog(LL_NOTICE,"Module %s unloaded",module->name); serverLog(LL_NOTICE,"Module %s unloaded",module->name);
dictDelete(modules,module->name); dictDelete(modules,module->name);

View File

@ -1118,6 +1118,11 @@ void freeClient(client *c) {
if (c->flags & CLIENT_SLAVE && listLength(server.slaves) == 0) if (c->flags & CLIENT_SLAVE && listLength(server.slaves) == 0)
server.repl_no_slaves_since = server.unixtime; server.repl_no_slaves_since = server.unixtime;
refreshGoodSlavesCount(); refreshGoodSlavesCount();
/* Fire the replica change modules event. */
if (c->replstate == SLAVE_STATE_ONLINE)
moduleFireServerEvent(REDISMODULE_EVENT_REPLICA_CHANGE,
REDISMODULE_SUBEVENT_REPLICA_CHANGE_OFFLINE,
NULL);
} }
/* Master/slave cleanup Case 2: /* Master/slave cleanup Case 2:

View File

@ -1080,9 +1080,9 @@ ssize_t rdbSaveAuxFieldStrInt(rio *rdb, char *key, long long val) {
} }
/* Save a few default AUX fields with information about the RDB generated. */ /* Save a few default AUX fields with information about the RDB generated. */
int rdbSaveInfoAuxFields(rio *rdb, int flags, rdbSaveInfo *rsi) { int rdbSaveInfoAuxFields(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
int redis_bits = (sizeof(void*) == 8) ? 64 : 32; int redis_bits = (sizeof(void*) == 8) ? 64 : 32;
int aof_preamble = (flags & RDB_SAVE_AOF_PREAMBLE) != 0; int aof_preamble = (rdbflags & RDBFLAGS_AOF_PREAMBLE) != 0;
/* Add a few fields about the state when the RDB was created. */ /* Add a few fields about the state when the RDB was created. */
if (rdbSaveAuxFieldStrStr(rdb,"redis-ver",REDIS_VERSION) == -1) return -1; if (rdbSaveAuxFieldStrStr(rdb,"redis-ver",REDIS_VERSION) == -1) return -1;
@ -1150,7 +1150,7 @@ ssize_t rdbSaveSingleModuleAux(rio *rdb, int when, moduleType *mt) {
* When the function returns C_ERR and if 'error' is not NULL, the * When the function returns C_ERR and if 'error' is not NULL, the
* integer pointed by 'error' is set to the value of errno just after the I/O * integer pointed by 'error' is set to the value of errno just after the I/O
* error. */ * error. */
int rdbSaveRio(rio *rdb, int *error, int flags, rdbSaveInfo *rsi) { int rdbSaveRio(rio *rdb, int *error, int rdbflags, rdbSaveInfo *rsi) {
dictIterator *di = NULL; dictIterator *di = NULL;
dictEntry *de; dictEntry *de;
char magic[10]; char magic[10];
@ -1162,7 +1162,7 @@ int rdbSaveRio(rio *rdb, int *error, int flags, rdbSaveInfo *rsi) {
rdb->update_cksum = rioGenericUpdateChecksum; rdb->update_cksum = rioGenericUpdateChecksum;
snprintf(magic,sizeof(magic),"REDIS%04d",RDB_VERSION); snprintf(magic,sizeof(magic),"REDIS%04d",RDB_VERSION);
if (rdbWriteRaw(rdb,magic,9) == -1) goto werr; if (rdbWriteRaw(rdb,magic,9) == -1) goto werr;
if (rdbSaveInfoAuxFields(rdb,flags,rsi) == -1) goto werr; if (rdbSaveInfoAuxFields(rdb,rdbflags,rsi) == -1) goto werr;
if (rdbSaveModulesAux(rdb, REDISMODULE_AUX_BEFORE_RDB) == -1) goto werr; if (rdbSaveModulesAux(rdb, REDISMODULE_AUX_BEFORE_RDB) == -1) goto werr;
for (j = 0; j < server.dbnum; j++) { for (j = 0; j < server.dbnum; j++) {
@ -1199,7 +1199,7 @@ int rdbSaveRio(rio *rdb, int *error, int flags, rdbSaveInfo *rsi) {
/* When this RDB is produced as part of an AOF rewrite, move /* When this RDB is produced as part of an AOF rewrite, move
* accumulated diff from parent to child while rewriting in * accumulated diff from parent to child while rewriting in
* order to have a smaller final write. */ * order to have a smaller final write. */
if (flags & RDB_SAVE_AOF_PREAMBLE && if (rdbflags & RDBFLAGS_AOF_PREAMBLE &&
rdb->processed_bytes > processed+AOF_READ_DIFF_INTERVAL_BYTES) rdb->processed_bytes > processed+AOF_READ_DIFF_INTERVAL_BYTES)
{ {
processed = rdb->processed_bytes; processed = rdb->processed_bytes;
@ -1254,18 +1254,21 @@ werr:
int rdbSaveRioWithEOFMark(rio *rdb, int *error, rdbSaveInfo *rsi) { int rdbSaveRioWithEOFMark(rio *rdb, int *error, rdbSaveInfo *rsi) {
char eofmark[RDB_EOF_MARK_SIZE]; char eofmark[RDB_EOF_MARK_SIZE];
startSaving(RDBFLAGS_REPLICATION);
getRandomHexChars(eofmark,RDB_EOF_MARK_SIZE); getRandomHexChars(eofmark,RDB_EOF_MARK_SIZE);
if (error) *error = 0; if (error) *error = 0;
if (rioWrite(rdb,"$EOF:",5) == 0) goto werr; if (rioWrite(rdb,"$EOF:",5) == 0) goto werr;
if (rioWrite(rdb,eofmark,RDB_EOF_MARK_SIZE) == 0) goto werr; if (rioWrite(rdb,eofmark,RDB_EOF_MARK_SIZE) == 0) goto werr;
if (rioWrite(rdb,"\r\n",2) == 0) goto werr; if (rioWrite(rdb,"\r\n",2) == 0) goto werr;
if (rdbSaveRio(rdb,error,RDB_SAVE_NONE,rsi) == C_ERR) goto werr; if (rdbSaveRio(rdb,error,RDBFLAGS_NONE,rsi) == C_ERR) goto werr;
if (rioWrite(rdb,eofmark,RDB_EOF_MARK_SIZE) == 0) goto werr; if (rioWrite(rdb,eofmark,RDB_EOF_MARK_SIZE) == 0) goto werr;
stopSaving(1);
return C_OK; return C_OK;
werr: /* Write error. */ werr: /* Write error. */
/* Set 'error' only if not already set by rdbSaveRio() call. */ /* Set 'error' only if not already set by rdbSaveRio() call. */
if (error && *error == 0) *error = errno; if (error && *error == 0) *error = errno;
stopSaving(0);
return C_ERR; return C_ERR;
} }
@ -1291,11 +1294,12 @@ int rdbSave(char *filename, rdbSaveInfo *rsi) {
} }
rioInitWithFile(&rdb,fp); rioInitWithFile(&rdb,fp);
startSaving(RDBFLAGS_NONE);
if (server.rdb_save_incremental_fsync) if (server.rdb_save_incremental_fsync)
rioSetAutoSync(&rdb,REDIS_AUTOSYNC_BYTES); rioSetAutoSync(&rdb,REDIS_AUTOSYNC_BYTES);
if (rdbSaveRio(&rdb,&error,RDB_SAVE_NONE,rsi) == C_ERR) { if (rdbSaveRio(&rdb,&error,RDBFLAGS_NONE,rsi) == C_ERR) {
errno = error; errno = error;
goto werr; goto werr;
} }
@ -1317,6 +1321,7 @@ int rdbSave(char *filename, rdbSaveInfo *rsi) {
cwdp ? cwdp : "unknown", cwdp ? cwdp : "unknown",
strerror(errno)); strerror(errno));
unlink(tmpfile); unlink(tmpfile);
stopSaving(0);
return C_ERR; return C_ERR;
} }
@ -1324,12 +1329,14 @@ int rdbSave(char *filename, rdbSaveInfo *rsi) {
server.dirty = 0; server.dirty = 0;
server.lastsave = time(NULL); server.lastsave = time(NULL);
server.lastbgsave_status = C_OK; server.lastbgsave_status = C_OK;
stopSaving(1);
return C_OK; return C_OK;
werr: werr:
serverLog(LL_WARNING,"Write error saving DB on disk: %s", strerror(errno)); serverLog(LL_WARNING,"Write error saving DB on disk: %s", strerror(errno));
fclose(fp); fclose(fp);
unlink(tmpfile); unlink(tmpfile);
stopSaving(0);
return C_ERR; return C_ERR;
} }
@ -1918,23 +1925,33 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key) {
/* Mark that we are loading in the global state and setup the fields /* Mark that we are loading in the global state and setup the fields
* needed to provide loading stats. */ * needed to provide loading stats. */
void startLoading(size_t size) { void startLoading(size_t size, int rdbflags) {
/* Load the DB */ /* Load the DB */
server.loading = 1; server.loading = 1;
server.loading_start_time = time(NULL); server.loading_start_time = time(NULL);
server.loading_loaded_bytes = 0; server.loading_loaded_bytes = 0;
server.loading_total_bytes = size; server.loading_total_bytes = size;
/* Fire the loading modules start event. */
int subevent;
if (rdbflags & RDBFLAGS_AOF_PREAMBLE)
subevent = REDISMODULE_SUBEVENT_LOADING_AOF_START;
else if(rdbflags & RDBFLAGS_REPLICATION)
subevent = REDISMODULE_SUBEVENT_LOADING_REPL_START;
else
subevent = REDISMODULE_SUBEVENT_LOADING_RDB_START;
moduleFireServerEvent(REDISMODULE_EVENT_LOADING,subevent,NULL);
} }
/* Mark that we are loading in the global state and setup the fields /* Mark that we are loading in the global state and setup the fields
* needed to provide loading stats. * needed to provide loading stats.
* 'filename' is optional and used for rdb-check on error */ * 'filename' is optional and used for rdb-check on error */
void startLoadingFile(FILE *fp, char* filename) { void startLoadingFile(FILE *fp, char* filename, int rdbflags) {
struct stat sb; struct stat sb;
if (fstat(fileno(fp), &sb) == -1) if (fstat(fileno(fp), &sb) == -1)
sb.st_size = 0; sb.st_size = 0;
rdbFileBeingLoaded = filename; rdbFileBeingLoaded = filename;
startLoading(sb.st_size); startLoading(sb.st_size, rdbflags);
} }
/* Refresh the loading progress info */ /* Refresh the loading progress info */
@ -1945,9 +1962,37 @@ void loadingProgress(off_t pos) {
} }
/* Loading finished */ /* Loading finished */
void stopLoading(void) { void stopLoading(int success) {
server.loading = 0; server.loading = 0;
rdbFileBeingLoaded = NULL; rdbFileBeingLoaded = NULL;
/* Fire the loading modules end event. */
moduleFireServerEvent(REDISMODULE_EVENT_LOADING,
success?
REDISMODULE_SUBEVENT_LOADING_ENDED:
REDISMODULE_SUBEVENT_LOADING_FAILED,
NULL);
}
void startSaving(int rdbflags) {
/* Fire the persistence modules end event. */
int subevent;
if (rdbflags & RDBFLAGS_AOF_PREAMBLE)
subevent = REDISMODULE_SUBEVENT_PERSISTENCE_AOF_START;
else if (getpid()!=server.pid)
subevent = REDISMODULE_SUBEVENT_PERSISTENCE_RDB_START;
else
subevent = REDISMODULE_SUBEVENT_PERSISTENCE_SYNC_RDB_START;
moduleFireServerEvent(REDISMODULE_EVENT_PERSISTENCE,subevent,NULL);
}
void stopSaving(int success) {
/* Fire the persistence modules end event. */
moduleFireServerEvent(REDISMODULE_EVENT_PERSISTENCE,
success?
REDISMODULE_SUBEVENT_PERSISTENCE_ENDED:
REDISMODULE_SUBEVENT_PERSISTENCE_FAILED,
NULL);
} }
/* Track loading progress in order to serve client's from time to time /* Track loading progress in order to serve client's from time to time
@ -1966,12 +2011,13 @@ void rdbLoadProgressCallback(rio *r, const void *buf, size_t len) {
replicationSendNewlineToMaster(); replicationSendNewlineToMaster();
loadingProgress(r->processed_bytes); loadingProgress(r->processed_bytes);
processEventsWhileBlocked(); processEventsWhileBlocked();
processModuleLoadingProgressEvent(0);
} }
} }
/* Load an RDB file from the rio stream 'rdb'. On success C_OK is returned, /* Load an RDB file from the rio stream 'rdb'. On success C_OK is returned,
* otherwise C_ERR is returned and 'errno' is set accordingly. */ * otherwise C_ERR is returned and 'errno' is set accordingly. */
int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) { int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
uint64_t dbid; uint64_t dbid;
int type, rdbver; int type, rdbver;
redisDb *db = server.db+0; redisDb *db = server.db+0;
@ -2182,7 +2228,7 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) {
* received from the master. In the latter case, the master is * received from the master. In the latter case, the master is
* responsible for key expiry. If we would expire keys here, the * responsible for key expiry. If we would expire keys here, the
* snapshot taken by the master may not be reflected on the slave. */ * snapshot taken by the master may not be reflected on the slave. */
if (server.masterhost == NULL && !loading_aof && expiretime != -1 && expiretime < now) { if (server.masterhost == NULL && !(rdbflags&RDBFLAGS_AOF_PREAMBLE) && expiretime != -1 && expiretime < now) {
decrRefCount(key); decrRefCount(key);
decrRefCount(val); decrRefCount(val);
} else { } else {
@ -2243,17 +2289,17 @@ eoferr:
* *
* If you pass an 'rsi' structure initialied with RDB_SAVE_OPTION_INIT, the * If you pass an 'rsi' structure initialied with RDB_SAVE_OPTION_INIT, the
* loading code will fiil the information fields in the structure. */ * loading code will fiil the information fields in the structure. */
int rdbLoad(char *filename, rdbSaveInfo *rsi) { int rdbLoad(char *filename, rdbSaveInfo *rsi, int rdbflags) {
FILE *fp; FILE *fp;
rio rdb; rio rdb;
int retval; int retval;
if ((fp = fopen(filename,"r")) == NULL) return C_ERR; if ((fp = fopen(filename,"r")) == NULL) return C_ERR;
startLoadingFile(fp, filename); startLoadingFile(fp, filename,rdbflags);
rioInitWithFile(&rdb,fp); rioInitWithFile(&rdb,fp);
retval = rdbLoadRio(&rdb,rsi,0); retval = rdbLoadRio(&rdb,rdbflags,rsi);
fclose(fp); fclose(fp);
stopLoading(); stopLoading(retval==C_OK);
return retval; return retval;
} }

View File

@ -121,8 +121,10 @@
#define RDB_LOAD_PLAIN (1<<1) #define RDB_LOAD_PLAIN (1<<1)
#define RDB_LOAD_SDS (1<<2) #define RDB_LOAD_SDS (1<<2)
#define RDB_SAVE_NONE 0 /* flags on the purpose of rdb save or load */
#define RDB_SAVE_AOF_PREAMBLE (1<<0) #define RDBFLAGS_NONE 0
#define RDBFLAGS_AOF_PREAMBLE (1<<0)
#define RDBFLAGS_REPLICATION (1<<1)
int rdbSaveType(rio *rdb, unsigned char type); int rdbSaveType(rio *rdb, unsigned char type);
int rdbLoadType(rio *rdb); int rdbLoadType(rio *rdb);
@ -135,7 +137,7 @@ uint64_t rdbLoadLen(rio *rdb, int *isencoded);
int rdbLoadLenByRef(rio *rdb, int *isencoded, uint64_t *lenptr); int rdbLoadLenByRef(rio *rdb, int *isencoded, uint64_t *lenptr);
int rdbSaveObjectType(rio *rdb, robj *o); int rdbSaveObjectType(rio *rdb, robj *o);
int rdbLoadObjectType(rio *rdb); int rdbLoadObjectType(rio *rdb);
int rdbLoad(char *filename, rdbSaveInfo *rsi); int rdbLoad(char *filename, rdbSaveInfo *rsi, int rdbflags);
int rdbSaveBackground(char *filename, rdbSaveInfo *rsi); int rdbSaveBackground(char *filename, rdbSaveInfo *rsi);
int rdbSaveToSlavesSockets(rdbSaveInfo *rsi); int rdbSaveToSlavesSockets(rdbSaveInfo *rsi);
void rdbRemoveTempFile(pid_t childpid); void rdbRemoveTempFile(pid_t childpid);
@ -154,7 +156,8 @@ int rdbSaveBinaryDoubleValue(rio *rdb, double val);
int rdbLoadBinaryDoubleValue(rio *rdb, double *val); int rdbLoadBinaryDoubleValue(rio *rdb, double *val);
int rdbSaveBinaryFloatValue(rio *rdb, float val); int rdbSaveBinaryFloatValue(rio *rdb, float val);
int rdbLoadBinaryFloatValue(rio *rdb, float *val); int rdbLoadBinaryFloatValue(rio *rdb, float *val);
int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof); int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi);
int rdbSaveRio(rio *rdb, int *error, int rdbflags, rdbSaveInfo *rsi);
rdbSaveInfo *rdbPopulateSaveInfo(rdbSaveInfo *rsi); rdbSaveInfo *rdbPopulateSaveInfo(rdbSaveInfo *rsi);
#endif #endif

View File

@ -202,7 +202,7 @@ int redis_check_rdb(char *rdbfilename, FILE *fp) {
} }
expiretime = -1; expiretime = -1;
startLoadingFile(fp, rdbfilename); startLoadingFile(fp, rdbfilename, RDBFLAGS_NONE);
while(1) { while(1) {
robj *key, *val; robj *key, *val;
@ -316,7 +316,7 @@ int redis_check_rdb(char *rdbfilename, FILE *fp) {
} }
if (closefile) fclose(fp); if (closefile) fclose(fp);
stopLoading(); stopLoading(1);
return 0; return 0;
eoferr: /* unexpected end of file is handled here with a fatal exit */ eoferr: /* unexpected end of file is handled here with a fatal exit */
@ -327,7 +327,7 @@ eoferr: /* unexpected end of file is handled here with a fatal exit */
} }
err: err:
if (closefile) fclose(fp); if (closefile) fclose(fp);
stopLoading(); stopLoading(0);
return 1; return 1;
} }

View File

@ -181,6 +181,8 @@ typedef uint64_t RedisModuleTimerID;
#define REDISMODULE_EVENT_REPLICA_CHANGE 6 #define REDISMODULE_EVENT_REPLICA_CHANGE 6
#define REDISMODULE_EVENT_MASTER_LINK_CHANGE 7 #define REDISMODULE_EVENT_MASTER_LINK_CHANGE 7
#define REDISMODULE_EVENT_CRON_LOOP 8 #define REDISMODULE_EVENT_CRON_LOOP 8
#define REDISMODULE_EVENT_MODULE_CHANGE 9
#define REDISMODULE_EVENT_LOADING_PROGRESS 10
typedef struct RedisModuleEvent { typedef struct RedisModuleEvent {
uint64_t id; /* REDISMODULE_EVENT_... defines. */ uint64_t id; /* REDISMODULE_EVENT_... defines. */
@ -226,18 +228,28 @@ static const RedisModuleEvent
RedisModuleEvent_MasterLinkChange = { RedisModuleEvent_MasterLinkChange = {
REDISMODULE_EVENT_MASTER_LINK_CHANGE, REDISMODULE_EVENT_MASTER_LINK_CHANGE,
1 1
},
RedisModuleEvent_ModuleChange = {
REDISMODULE_EVENT_MODULE_CHANGE,
1
},
RedisModuleEvent_LoadingProgress = {
REDISMODULE_EVENT_LOADING_PROGRESS,
1
}; };
/* Those are values that are used for the 'subevent' callback argument. */ /* Those are values that are used for the 'subevent' callback argument. */
#define REDISMODULE_SUBEVENT_PERSISTENCE_RDB_START 0 #define REDISMODULE_SUBEVENT_PERSISTENCE_RDB_START 0
#define REDISMODULE_SUBEVENT_PERSISTENCE_RDB_END 1 #define REDISMODULE_SUBEVENT_PERSISTENCE_AOF_START 1
#define REDISMODULE_SUBEVENT_PERSISTENCE_AOF_START 2 #define REDISMODULE_SUBEVENT_PERSISTENCE_SYNC_RDB_START 2
#define REDISMODULE_SUBEVENT_PERSISTENCE_AOF_END 3 #define REDISMODULE_SUBEVENT_PERSISTENCE_ENDED 3
#define REDISMODULE_SUBEVENT_PERSISTENCE_FAILED 4
#define REDISMODULE_SUBEVENT_LOADING_RDB_START 0 #define REDISMODULE_SUBEVENT_LOADING_RDB_START 0
#define REDISMODULE_SUBEVENT_LOADING_RDB_END 1 #define REDISMODULE_SUBEVENT_LOADING_AOF_START 1
#define REDISMODULE_SUBEVENT_LOADING_AOF_START 2 #define REDISMODULE_SUBEVENT_LOADING_REPL_START 2
#define REDISMODULE_SUBEVENT_LOADING_AOF_END 3 #define REDISMODULE_SUBEVENT_LOADING_ENDED 3
#define REDISMODULE_SUBEVENT_LOADING_FAILED 4
#define REDISMODULE_SUBEVENT_CLIENT_CHANGE_CONNECTED 0 #define REDISMODULE_SUBEVENT_CLIENT_CHANGE_CONNECTED 0
#define REDISMODULE_SUBEVENT_CLIENT_CHANGE_DISCONNECTED 1 #define REDISMODULE_SUBEVENT_CLIENT_CHANGE_DISCONNECTED 1
@ -245,12 +257,21 @@ static const RedisModuleEvent
#define REDISMODULE_SUBEVENT_MASTER_LINK_UP 0 #define REDISMODULE_SUBEVENT_MASTER_LINK_UP 0
#define REDISMODULE_SUBEVENT_MASTER_LINK_DOWN 1 #define REDISMODULE_SUBEVENT_MASTER_LINK_DOWN 1
#define REDISMODULE_SUBEVENT_REPLICA_CHANGE_CONNECTED 0 #define REDISMODULE_SUBEVENT_REPLICA_CHANGE_ONLINE 0
#define REDISMODULE_SUBEVENT_REPLICA_CHANGE_DISCONNECTED 1 #define REDISMODULE_SUBEVENT_REPLICA_CHANGE_OFFLINE 1
#define REDISMODULE_EVENT_REPLROLECHANGED_NOW_MASTER 0
#define REDISMODULE_EVENT_REPLROLECHANGED_NOW_REPLICA 1
#define REDISMODULE_SUBEVENT_FLUSHDB_START 0 #define REDISMODULE_SUBEVENT_FLUSHDB_START 0
#define REDISMODULE_SUBEVENT_FLUSHDB_END 1 #define REDISMODULE_SUBEVENT_FLUSHDB_END 1
#define REDISMODULE_SUBEVENT_MODULE_LOADED 0
#define REDISMODULE_SUBEVENT_MODULE_UNLOADED 1
#define REDISMODULE_SUBEVENT_LOADING_PROGRESS_RDB 0
#define REDISMODULE_SUBEVENT_LOADING_PROGRESS_AOF 1
/* RedisModuleClientInfo flags. */ /* RedisModuleClientInfo flags. */
#define REDISMODULE_CLIENTINFO_FLAG_SSL (1<<0) #define REDISMODULE_CLIENTINFO_FLAG_SSL (1<<0)
#define REDISMODULE_CLIENTINFO_FLAG_PUBSUB (1<<1) #define REDISMODULE_CLIENTINFO_FLAG_PUBSUB (1<<1)
@ -286,6 +307,22 @@ typedef struct RedisModuleClientInfo {
#define RedisModuleClientInfo RedisModuleClientInfoV1 #define RedisModuleClientInfo RedisModuleClientInfoV1
#define REDISMODULE_REPLICATIONINFO_VERSION 1
typedef struct RedisModuleReplicationInfo {
uint64_t version; /* Not used since this structure is never passed
from the module to the core right now. Here
for future compatibility. */
int master; /* true if master, false if replica */
char *masterhost; /* master instance hostname for NOW_REPLICA */
int masterport; /* master instance port for NOW_REPLICA */
char *replid1; /* Main replication ID */
char *replid2; /* Secondary replication ID */
uint64_t repl1_offset; /* Main replication offset */
uint64_t repl2_offset; /* Offset of replid2 validity */
} RedisModuleReplicationInfoV1;
#define RedisModuleReplicationInfo RedisModuleReplicationInfoV1
#define REDISMODULE_FLUSHINFO_VERSION 1 #define REDISMODULE_FLUSHINFO_VERSION 1
typedef struct RedisModuleFlushInfo { typedef struct RedisModuleFlushInfo {
uint64_t version; /* Not used since this structure is never passed uint64_t version; /* Not used since this structure is never passed
@ -297,6 +334,39 @@ typedef struct RedisModuleFlushInfo {
#define RedisModuleFlushInfo RedisModuleFlushInfoV1 #define RedisModuleFlushInfo RedisModuleFlushInfoV1
#define REDISMODULE_MODULE_CHANGE_VERSION 1
typedef struct RedisModuleModuleChange {
uint64_t version; /* Not used since this structure is never passed
from the module to the core right now. Here
for future compatibility. */
const char* module_name;/* Name of module loaded or unloaded. */
int32_t module_version; /* Module version. */
} RedisModuleModuleChangeV1;
#define RedisModuleModuleChange RedisModuleModuleChangeV1
#define REDISMODULE_CRON_LOOP_VERSION 1
typedef struct RedisModuleCronLoopInfo {
uint64_t version; /* Not used since this structure is never passed
from the module to the core right now. Here
for future compatibility. */
int32_t hz; /* Approximate number of events per second. */
} RedisModuleCronLoopV1;
#define RedisModuleCronLoop RedisModuleCronLoopV1
#define REDISMODULE_LOADING_PROGRESS_VERSION 1
typedef struct RedisModuleLoadingProgressInfo {
uint64_t version; /* Not used since this structure is never passed
from the module to the core right now. Here
for future compatibility. */
int32_t hz; /* Approximate number of events per second. */
int32_t progress; /* Approximate progress between 0 and 1024, or -1
* if unknown. */
} RedisModuleLoadingProgressV1;
#define RedisModuleLoadingProgress RedisModuleLoadingProgressV1
/* ------------------------- End of common defines ------------------------ */ /* ------------------------- End of common defines ------------------------ */
#ifndef REDISMODULE_CORE #ifndef REDISMODULE_CORE

View File

@ -533,6 +533,12 @@ int masterTryPartialResynchronization(client *c) {
* has this state from the previous connection with the master. */ * has this state from the previous connection with the master. */
refreshGoodSlavesCount(); refreshGoodSlavesCount();
/* Fire the replica change modules event. */
moduleFireServerEvent(REDISMODULE_EVENT_REPLICA_CHANGE,
REDISMODULE_SUBEVENT_REPLICA_CHANGE_ONLINE,
NULL);
return C_OK; /* The caller can return, no full resync needed. */ return C_OK; /* The caller can return, no full resync needed. */
need_full_resync: need_full_resync:
@ -868,6 +874,10 @@ void putSlaveOnline(client *slave) {
return; return;
} }
refreshGoodSlavesCount(); refreshGoodSlavesCount();
/* Fire the replica change modules event. */
moduleFireServerEvent(REDISMODULE_EVENT_REPLICA_CHANGE,
REDISMODULE_SUBEVENT_REPLICA_CHANGE_ONLINE,
NULL);
serverLog(LL_NOTICE,"Synchronization with replica %s succeeded", serverLog(LL_NOTICE,"Synchronization with replica %s succeeded",
replicationGetSlaveName(slave)); replicationGetSlaveName(slave));
} }
@ -1542,11 +1552,11 @@ void readSyncBulkPayload(connection *conn) {
* We'll restore it when the RDB is received. */ * We'll restore it when the RDB is received. */
connBlock(conn); connBlock(conn);
connRecvTimeout(conn, server.repl_timeout*1000); connRecvTimeout(conn, server.repl_timeout*1000);
startLoading(server.repl_transfer_size); startLoading(server.repl_transfer_size, RDBFLAGS_REPLICATION);
if (rdbLoadRio(&rdb,&rsi,0) != C_OK) { if (rdbLoadRio(&rdb,RDBFLAGS_REPLICATION,&rsi) != C_OK) {
/* RDB loading failed. */ /* RDB loading failed. */
stopLoading(); stopLoading(0);
serverLog(LL_WARNING, serverLog(LL_WARNING,
"Failed trying to load the MASTER synchronization DB " "Failed trying to load the MASTER synchronization DB "
"from socket"); "from socket");
@ -1567,7 +1577,7 @@ void readSyncBulkPayload(connection *conn) {
* gets promoted. */ * gets promoted. */
return; return;
} }
stopLoading(); stopLoading(1);
/* RDB loading succeeded if we reach this point. */ /* RDB loading succeeded if we reach this point. */
if (server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB) { if (server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB) {
@ -1614,7 +1624,7 @@ void readSyncBulkPayload(connection *conn) {
cancelReplicationHandshake(); cancelReplicationHandshake();
return; return;
} }
if (rdbLoad(server.rdb_filename,&rsi) != C_OK) { if (rdbLoad(server.rdb_filename,&rsi,RDBFLAGS_REPLICATION) != C_OK) {
serverLog(LL_WARNING, serverLog(LL_WARNING,
"Failed trying to load the MASTER synchronization " "Failed trying to load the MASTER synchronization "
"DB from disk"); "DB from disk");
@ -1636,6 +1646,11 @@ void readSyncBulkPayload(connection *conn) {
server.repl_state = REPL_STATE_CONNECTED; server.repl_state = REPL_STATE_CONNECTED;
server.repl_down_since = 0; server.repl_down_since = 0;
/* Fire the master link modules event. */
moduleFireServerEvent(REDISMODULE_EVENT_MASTER_LINK_CHANGE,
REDISMODULE_SUBEVENT_MASTER_LINK_UP,
NULL);
/* After a full resynchroniziation we use the replication ID and /* After a full resynchroniziation we use the replication ID and
* offset of the master. The secondary ID / offset are cleared since * offset of the master. The secondary ID / offset are cleared since
* we are starting a new history. */ * we are starting a new history. */
@ -2314,12 +2329,31 @@ void replicationSetMaster(char *ip, int port) {
replicationDiscardCachedMaster(); replicationDiscardCachedMaster();
replicationCacheMasterUsingMyself(); replicationCacheMasterUsingMyself();
} }
/* Fire the role change modules event. */
moduleFireServerEvent(REDISMODULE_EVENT_REPLICATION_ROLE_CHANGED,
REDISMODULE_EVENT_REPLROLECHANGED_NOW_REPLICA,
NULL);
/* Fire the master link modules event. */
if (server.repl_state == REPL_STATE_CONNECTED)
moduleFireServerEvent(REDISMODULE_EVENT_MASTER_LINK_CHANGE,
REDISMODULE_SUBEVENT_MASTER_LINK_DOWN,
NULL);
server.repl_state = REPL_STATE_CONNECT; server.repl_state = REPL_STATE_CONNECT;
} }
/* Cancel replication, setting the instance as a master itself. */ /* Cancel replication, setting the instance as a master itself. */
void replicationUnsetMaster(void) { void replicationUnsetMaster(void) {
if (server.masterhost == NULL) return; /* Nothing to do. */ if (server.masterhost == NULL) return; /* Nothing to do. */
/* Fire the master link modules event. */
if (server.repl_state == REPL_STATE_CONNECTED)
moduleFireServerEvent(REDISMODULE_EVENT_MASTER_LINK_CHANGE,
REDISMODULE_SUBEVENT_MASTER_LINK_DOWN,
NULL);
sdsfree(server.masterhost); sdsfree(server.masterhost);
server.masterhost = NULL; server.masterhost = NULL;
/* When a slave is turned into a master, the current replication ID /* When a slave is turned into a master, the current replication ID
@ -2348,11 +2382,22 @@ void replicationUnsetMaster(void) {
* starting from now. Otherwise the backlog will be freed after a * starting from now. Otherwise the backlog will be freed after a
* failover if slaves do not connect immediately. */ * failover if slaves do not connect immediately. */
server.repl_no_slaves_since = server.unixtime; server.repl_no_slaves_since = server.unixtime;
/* Fire the role change modules event. */
moduleFireServerEvent(REDISMODULE_EVENT_REPLICATION_ROLE_CHANGED,
REDISMODULE_EVENT_REPLROLECHANGED_NOW_MASTER,
NULL);
} }
/* This function is called when the slave lose the connection with the /* This function is called when the slave lose the connection with the
* master into an unexpected way. */ * master into an unexpected way. */
void replicationHandleMasterDisconnection(void) { void replicationHandleMasterDisconnection(void) {
/* Fire the master link modules event. */
if (server.repl_state == REPL_STATE_CONNECTED)
moduleFireServerEvent(REDISMODULE_EVENT_MASTER_LINK_CHANGE,
REDISMODULE_SUBEVENT_MASTER_LINK_DOWN,
NULL);
server.master = NULL; server.master = NULL;
server.repl_state = REPL_STATE_CONNECT; server.repl_state = REPL_STATE_CONNECT;
server.repl_down_since = server.unixtime; server.repl_down_since = server.unixtime;

View File

@ -2056,6 +2056,12 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
server.rdb_bgsave_scheduled = 0; server.rdb_bgsave_scheduled = 0;
} }
/* Fire the cron loop modules event. */
RedisModuleCronLoopV1 ei = {REDISMODULE_CRON_LOOP_VERSION,server.hz};
moduleFireServerEvent(REDISMODULE_EVENT_CRON_LOOP,
0,
&ei);
server.cronloops++; server.cronloops++;
return 1000/server.hz; return 1000/server.hz;
} }
@ -3682,6 +3688,9 @@ int prepareForShutdown(int flags) {
} }
} }
/* Fire the shutdown modules event. */
moduleFireServerEvent(REDISMODULE_EVENT_SHUTDOWN,0,NULL);
/* Remove the pid file if possible and needed. */ /* Remove the pid file if possible and needed. */
if (server.daemonize || server.pidfile) { if (server.daemonize || server.pidfile) {
serverLog(LL_NOTICE,"Removing the pid file."); serverLog(LL_NOTICE,"Removing the pid file.");
@ -4767,7 +4776,7 @@ void loadDataFromDisk(void) {
serverLog(LL_NOTICE,"DB loaded from append only file: %.3f seconds",(float)(ustime()-start)/1000000); serverLog(LL_NOTICE,"DB loaded from append only file: %.3f seconds",(float)(ustime()-start)/1000000);
} else { } else {
rdbSaveInfo rsi = RDB_SAVE_INFO_INIT; rdbSaveInfo rsi = RDB_SAVE_INFO_INIT;
if (rdbLoad(server.rdb_filename,&rsi) == C_OK) { if (rdbLoad(server.rdb_filename,&rsi,RDBFLAGS_NONE) == C_OK) {
serverLog(LL_NOTICE,"DB loaded from disk: %.3f seconds", serverLog(LL_NOTICE,"DB loaded from disk: %.3f seconds",
(float)(ustime()-start)/1000000); (float)(ustime()-start)/1000000);

View File

@ -1602,6 +1602,7 @@ ssize_t rdbSaveModulesAux(rio *rdb, int when);
int moduleAllDatatypesHandleErrors(); int moduleAllDatatypesHandleErrors();
sds modulesCollectInfo(sds info, sds section, int for_crash_report, int sections); sds modulesCollectInfo(sds info, sds section, int for_crash_report, int sections);
void moduleFireServerEvent(uint64_t eid, int subid, void *data); void moduleFireServerEvent(uint64_t eid, int subid, void *data);
void processModuleLoadingProgressEvent(int is_aof);
/* Utils */ /* Utils */
long long ustime(void); long long ustime(void);
@ -1831,10 +1832,12 @@ void rdbPipeReadHandler(struct aeEventLoop *eventLoop, int fd, void *clientData,
void rdbPipeWriteHandlerConnRemoved(struct connection *conn); void rdbPipeWriteHandlerConnRemoved(struct connection *conn);
/* Generic persistence functions */ /* Generic persistence functions */
void startLoadingFile(FILE* fp, char* filename); void startLoadingFile(FILE* fp, char* filename, int rdbflags);
void startLoading(size_t size); void startLoading(size_t size, int rdbflags);
void loadingProgress(off_t pos); void loadingProgress(off_t pos);
void stopLoading(void); void stopLoading(int success);
void startSaving(int rdbflags);
void stopSaving(int success);
#define DISK_ERROR_TYPE_AOF 1 /* Don't accept writes: AOF errors. */ #define DISK_ERROR_TYPE_AOF 1 /* Don't accept writes: AOF errors. */
#define DISK_ERROR_TYPE_RDB 2 /* Don't accept writes: RDB errors. */ #define DISK_ERROR_TYPE_RDB 2 /* Don't accept writes: RDB errors. */
@ -1843,7 +1846,6 @@ int writeCommandsDeniedByDiskError(void);
/* RDB persistence */ /* RDB persistence */
#include "rdb.h" #include "rdb.h"
int rdbSaveRio(rio *rdb, int *error, int flags, rdbSaveInfo *rsi);
void killRDBChild(void); void killRDBChild(void);
/* AOF persistence */ /* AOF persistence */

View File

@ -30,36 +30,227 @@
* POSSIBILITY OF SUCH DAMAGE. * POSSIBILITY OF SUCH DAMAGE.
*/ */
#define REDISMODULE_EXPERIMENTAL_API
#include "redismodule.h" #include "redismodule.h"
#include <stdio.h>
#include <string.h>
/* We need to store events to be able to test and see what we got, and we can't
* store them in the key-space since that would mess up rdb loading (duplicates)
* and be lost of flushdb. */
RedisModuleDict *event_log = NULL;
typedef struct EventElement {
long count;
RedisModuleString *last_val_string;
long last_val_int;
} EventElement;
void LogStringEvent(RedisModuleCtx *ctx, const char* keyname, const char* data) {
EventElement *event = RedisModule_DictGetC(event_log, (void*)keyname, strlen(keyname), NULL);
if (!event) {
event = RedisModule_Alloc(sizeof(EventElement));
memset(event, 0, sizeof(EventElement));
RedisModule_DictSetC(event_log, (void*)keyname, strlen(keyname), event);
}
if (event->last_val_string) RedisModule_FreeString(ctx, event->last_val_string);
event->last_val_string = RedisModule_CreateString(ctx, data, strlen(data));
event->count++;
}
void LogNumericEvent(RedisModuleCtx *ctx, const char* keyname, long data) {
REDISMODULE_NOT_USED(ctx);
EventElement *event = RedisModule_DictGetC(event_log, (void*)keyname, strlen(keyname), NULL);
if (!event) {
event = RedisModule_Alloc(sizeof(EventElement));
memset(event, 0, sizeof(EventElement));
RedisModule_DictSetC(event_log, (void*)keyname, strlen(keyname), event);
}
event->last_val_int = data;
event->count++;
}
void FreeEvent(RedisModuleCtx *ctx, EventElement *event) {
if (event->last_val_string)
RedisModule_FreeString(ctx, event->last_val_string);
RedisModule_Free(event);
}
int cmdEventCount(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
{
if (argc != 2){
RedisModule_WrongArity(ctx);
return REDISMODULE_OK;
}
EventElement *event = RedisModule_DictGet(event_log, argv[1], NULL);
RedisModule_ReplyWithLongLong(ctx, event? event->count: 0);
return REDISMODULE_OK;
}
int cmdEventLast(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
{
if (argc != 2){
RedisModule_WrongArity(ctx);
return REDISMODULE_OK;
}
EventElement *event = RedisModule_DictGet(event_log, argv[1], NULL);
if (event && event->last_val_string)
RedisModule_ReplyWithString(ctx, event->last_val_string);
else if (event)
RedisModule_ReplyWithLongLong(ctx, event->last_val_int);
else
RedisModule_ReplyWithNull(ctx);
return REDISMODULE_OK;
}
void clearEvents(RedisModuleCtx *ctx)
{
RedisModuleString *key;
EventElement *event;
RedisModuleDictIter *iter = RedisModule_DictIteratorStart(event_log, "^", NULL);
while((key = RedisModule_DictNext(ctx, iter, (void**)&event)) != NULL) {
event->count = 0;
event->last_val_int = 0;
if (event->last_val_string) RedisModule_FreeString(ctx, event->last_val_string);
event->last_val_string = NULL;
RedisModule_DictDel(event_log, key, NULL);
RedisModule_Free(event);
}
RedisModule_DictIteratorStop(iter);
}
int cmdEventsClear(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
{
REDISMODULE_NOT_USED(argc);
REDISMODULE_NOT_USED(argv);
clearEvents(ctx);
return REDISMODULE_OK;
}
/* Client state change callback. */ /* Client state change callback. */
void clientChangeCallback(RedisModuleCtx *ctx, RedisModuleEvent e, uint64_t sub, void *data) void clientChangeCallback(RedisModuleCtx *ctx, RedisModuleEvent e, uint64_t sub, void *data)
{ {
REDISMODULE_NOT_USED(ctx);
REDISMODULE_NOT_USED(e); REDISMODULE_NOT_USED(e);
RedisModuleClientInfo *ci = data; RedisModuleClientInfo *ci = data;
char *keyname = (sub == REDISMODULE_SUBEVENT_CLIENT_CHANGE_CONNECTED) ? char *keyname = (sub == REDISMODULE_SUBEVENT_CLIENT_CHANGE_CONNECTED) ?
"connected" : "disconnected"; "client-connected" : "client-disconnected";
RedisModuleCallReply *reply; LogNumericEvent(ctx, keyname, ci->id);
RedisModule_SelectDb(ctx,9);
reply = RedisModule_Call(ctx,"RPUSH","cl",keyname,(long)ci->id);
RedisModule_FreeCallReply(reply);
} }
void flushdbCallback(RedisModuleCtx *ctx, RedisModuleEvent e, uint64_t sub, void *data) void flushdbCallback(RedisModuleCtx *ctx, RedisModuleEvent e, uint64_t sub, void *data)
{ {
REDISMODULE_NOT_USED(ctx);
REDISMODULE_NOT_USED(e); REDISMODULE_NOT_USED(e);
RedisModuleFlushInfo *fi = data; RedisModuleFlushInfo *fi = data;
char *keyname = (sub == REDISMODULE_SUBEVENT_FLUSHDB_START) ? char *keyname = (sub == REDISMODULE_SUBEVENT_FLUSHDB_START) ?
"flush-start" : "flush-end"; "flush-start" : "flush-end";
RedisModuleCallReply *reply; LogNumericEvent(ctx, keyname, fi->dbnum);
RedisModule_SelectDb(ctx,9); }
reply = RedisModule_Call(ctx,"RPUSH","cl",keyname,(long)fi->dbnum);
RedisModule_FreeCallReply(reply); void roleChangeCallback(RedisModuleCtx *ctx, RedisModuleEvent e, uint64_t sub, void *data)
{
REDISMODULE_NOT_USED(e);
REDISMODULE_NOT_USED(data);
RedisModuleReplicationInfo *ri = data;
char *keyname = (sub == REDISMODULE_EVENT_REPLROLECHANGED_NOW_MASTER) ?
"role-master" : "role-replica";
LogStringEvent(ctx, keyname, ri->masterhost);
}
void replicationChangeCallback(RedisModuleCtx *ctx, RedisModuleEvent e, uint64_t sub, void *data)
{
REDISMODULE_NOT_USED(e);
REDISMODULE_NOT_USED(data);
char *keyname = (sub == REDISMODULE_SUBEVENT_REPLICA_CHANGE_ONLINE) ?
"replica-online" : "replica-offline";
LogNumericEvent(ctx, keyname, 0);
}
void rasterLinkChangeCallback(RedisModuleCtx *ctx, RedisModuleEvent e, uint64_t sub, void *data)
{
REDISMODULE_NOT_USED(e);
REDISMODULE_NOT_USED(data);
char *keyname = (sub == REDISMODULE_SUBEVENT_MASTER_LINK_UP) ?
"masterlink-up" : "masterlink-down";
LogNumericEvent(ctx, keyname, 0);
}
void persistenceCallback(RedisModuleCtx *ctx, RedisModuleEvent e, uint64_t sub, void *data)
{
REDISMODULE_NOT_USED(e);
REDISMODULE_NOT_USED(data);
char *keyname = NULL;
switch (sub) {
case REDISMODULE_SUBEVENT_PERSISTENCE_RDB_START: keyname = "persistence-rdb-start"; break;
case REDISMODULE_SUBEVENT_PERSISTENCE_AOF_START: keyname = "persistence-aof-start"; break;
case REDISMODULE_SUBEVENT_PERSISTENCE_SYNC_RDB_START: keyname = "persistence-syncrdb-start"; break;
case REDISMODULE_SUBEVENT_PERSISTENCE_ENDED: keyname = "persistence-end"; break;
case REDISMODULE_SUBEVENT_PERSISTENCE_FAILED: keyname = "persistence-failed"; break;
}
/* modifying the keyspace from the fork child is not an option, using log instead */
RedisModule_Log(ctx, "warning", "module-event-%s", keyname);
if (sub == REDISMODULE_SUBEVENT_PERSISTENCE_SYNC_RDB_START)
LogNumericEvent(ctx, keyname, 0);
}
void loadingCallback(RedisModuleCtx *ctx, RedisModuleEvent e, uint64_t sub, void *data)
{
REDISMODULE_NOT_USED(e);
REDISMODULE_NOT_USED(data);
char *keyname = NULL;
switch (sub) {
case REDISMODULE_SUBEVENT_LOADING_RDB_START: keyname = "loading-rdb-start"; break;
case REDISMODULE_SUBEVENT_LOADING_AOF_START: keyname = "loading-aof-start"; break;
case REDISMODULE_SUBEVENT_LOADING_REPL_START: keyname = "loading-repl-start"; break;
case REDISMODULE_SUBEVENT_LOADING_ENDED: keyname = "loading-end"; break;
case REDISMODULE_SUBEVENT_LOADING_FAILED: keyname = "loading-failed"; break;
}
LogNumericEvent(ctx, keyname, 0);
}
void loadingProgressCallback(RedisModuleCtx *ctx, RedisModuleEvent e, uint64_t sub, void *data)
{
REDISMODULE_NOT_USED(e);
RedisModuleLoadingProgress *ei = data;
char *keyname = (sub == REDISMODULE_SUBEVENT_LOADING_PROGRESS_RDB) ?
"loading-progress-rdb" : "loading-progress-aof";
LogNumericEvent(ctx, keyname, ei->progress);
}
void shutdownCallback(RedisModuleCtx *ctx, RedisModuleEvent e, uint64_t sub, void *data)
{
REDISMODULE_NOT_USED(e);
REDISMODULE_NOT_USED(data);
REDISMODULE_NOT_USED(sub);
RedisModule_Log(ctx, "warning", "module-event-%s", "shutdown");
}
void cronLoopCallback(RedisModuleCtx *ctx, RedisModuleEvent e, uint64_t sub, void *data)
{
REDISMODULE_NOT_USED(e);
REDISMODULE_NOT_USED(sub);
RedisModuleCronLoop *ei = data;
LogNumericEvent(ctx, "cron-loop", ei->hz);
}
void moduleChangeCallback(RedisModuleCtx *ctx, RedisModuleEvent e, uint64_t sub, void *data)
{
REDISMODULE_NOT_USED(e);
RedisModuleModuleChange *ei = data;
char *keyname = (sub == REDISMODULE_SUBEVENT_MODULE_LOADED) ?
"module-loaded" : "module-unloaded";
LogStringEvent(ctx, keyname, ei->module_name);
} }
/* This function must be present on each Redis module. It is used in order to /* This function must be present on each Redis module. It is used in order to
@ -71,9 +262,50 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
if (RedisModule_Init(ctx,"testhook",1,REDISMODULE_APIVER_1) if (RedisModule_Init(ctx,"testhook",1,REDISMODULE_APIVER_1)
== REDISMODULE_ERR) return REDISMODULE_ERR; == REDISMODULE_ERR) return REDISMODULE_ERR;
/* replication related hooks */
RedisModule_SubscribeToServerEvent(ctx,
RedisModuleEvent_ReplicationRoleChanged, roleChangeCallback);
RedisModule_SubscribeToServerEvent(ctx,
RedisModuleEvent_ReplicaChange, replicationChangeCallback);
RedisModule_SubscribeToServerEvent(ctx,
RedisModuleEvent_MasterLinkChange, rasterLinkChangeCallback);
/* persistence related hooks */
RedisModule_SubscribeToServerEvent(ctx,
RedisModuleEvent_Persistence, persistenceCallback);
RedisModule_SubscribeToServerEvent(ctx,
RedisModuleEvent_Loading, loadingCallback);
RedisModule_SubscribeToServerEvent(ctx,
RedisModuleEvent_LoadingProgress, loadingProgressCallback);
/* other hooks */
RedisModule_SubscribeToServerEvent(ctx, RedisModule_SubscribeToServerEvent(ctx,
RedisModuleEvent_ClientChange, clientChangeCallback); RedisModuleEvent_ClientChange, clientChangeCallback);
RedisModule_SubscribeToServerEvent(ctx, RedisModule_SubscribeToServerEvent(ctx,
RedisModuleEvent_FlushDB, flushdbCallback); RedisModuleEvent_FlushDB, flushdbCallback);
RedisModule_SubscribeToServerEvent(ctx,
RedisModuleEvent_Shutdown, shutdownCallback);
RedisModule_SubscribeToServerEvent(ctx,
RedisModuleEvent_CronLoop, cronLoopCallback);
RedisModule_SubscribeToServerEvent(ctx,
RedisModuleEvent_ModuleChange, moduleChangeCallback);
event_log = RedisModule_CreateDict(ctx);
if (RedisModule_CreateCommand(ctx,"hooks.event_count", cmdEventCount,"",0,0,0) == REDISMODULE_ERR)
return REDISMODULE_ERR;
if (RedisModule_CreateCommand(ctx,"hooks.event_last", cmdEventLast,"",0,0,0) == REDISMODULE_ERR)
return REDISMODULE_ERR;
if (RedisModule_CreateCommand(ctx,"hooks.clear", cmdEventsClear,"",0,0,0) == REDISMODULE_ERR)
return REDISMODULE_ERR;
return REDISMODULE_OK; return REDISMODULE_OK;
} }
int RedisModule_OnUnload(RedisModuleCtx *ctx) {
clearEvents(ctx);
RedisModule_FreeDict(ctx, event_log);
event_log = NULL;
return REDISMODULE_OK;
}

View File

@ -3,26 +3,138 @@ set testmodule [file normalize tests/modules/hooks.so]
tags "modules" { tags "modules" {
start_server {} { start_server {} {
r module load $testmodule r module load $testmodule
r config set appendonly yes
test {Test clients connection / disconnection hooks} { test {Test clients connection / disconnection hooks} {
for {set j 0} {$j < 2} {incr j} { for {set j 0} {$j < 2} {incr j} {
set rd1 [redis_deferring_client] set rd1 [redis_deferring_client]
$rd1 close $rd1 close
} }
assert {[r llen connected] > 1} assert {[r hooks.event_count client-connected] > 1}
assert {[r llen disconnected] > 1} assert {[r hooks.event_count client-disconnected] > 1}
}
test {Test module cron hook} {
after 100
assert {[r hooks.event_count cron-loop] > 0}
set hz [r hooks.event_last cron-loop]
assert_equal $hz 10
}
test {Test module loaded / unloaded hooks} {
set othermodule [file normalize tests/modules/infotest.so]
r module load $othermodule
r module unload infotest
assert_equal [r hooks.event_last module-loaded] "infotest"
assert_equal [r hooks.event_last module-unloaded] "infotest"
}
test {Test module aofrw hook} {
r debug populate 1000 foo 10000 ;# 10mb worth of data
r config set rdbcompression no ;# rdb progress is only checked once in 2mb
r BGREWRITEAOF
waitForBgrewriteaof r
assert_equal [string match {*module-event-persistence-aof-start*} [exec tail -20 < [srv 0 stdout]]] 1
assert_equal [string match {*module-event-persistence-end*} [exec tail -20 < [srv 0 stdout]]] 1
}
test {Test module aof load and rdb/aof progress hooks} {
# create some aof tail (progress is checked only once in 1000 commands)
for {set j 0} {$j < 4000} {incr j} {
r set "bar$j" x
}
# set some configs that will cause many loading progress events during aof loading
r config set key-load-delay 1
r config set dynamic-hz no
r config set hz 500
r DEBUG LOADAOF
assert_equal [r hooks.event_last loading-aof-start] 0
assert_equal [r hooks.event_last loading-end] 0
assert {[r hooks.event_count loading-rdb-start] == 0}
assert {[r hooks.event_count loading-progress-rdb] >= 2} ;# comes from the preamble section
assert {[r hooks.event_count loading-progress-aof] >= 2}
}
# undo configs before next test
r config set dynamic-hz yes
r config set key-load-delay 0
test {Test module rdb save hook} {
# debug reload does: save, flush, load:
assert {[r hooks.event_count persistence-syncrdb-start] == 0}
assert {[r hooks.event_count loading-rdb-start] == 0}
r debug reload
assert {[r hooks.event_count persistence-syncrdb-start] == 1}
assert {[r hooks.event_count loading-rdb-start] == 1}
} }
test {Test flushdb hooks} { test {Test flushdb hooks} {
r flushall ;# Note: only the "end" RPUSH will survive
r select 1
r flushdb r flushdb
r select 2 assert_equal [r hooks.event_last flush-start] 9
r flushdb assert_equal [r hooks.event_last flush-end] 9
r select 9 r flushall
assert {[r llen flush-start] == 2} assert_equal [r hooks.event_last flush-start] -1
assert {[r llen flush-end] == 3} assert_equal [r hooks.event_last flush-end] -1
assert {[r lrange flush-start 0 -1] eq {1 2}} }
assert {[r lrange flush-end 0 -1] eq {-1 1 2}}
} # replication related tests
set master [srv 0 client]
set master_host [srv 0 host]
set master_port [srv 0 port]
start_server {} {
r module load $testmodule
set replica [srv 0 client]
set replica_host [srv 0 host]
set replica_port [srv 0 port]
$replica replicaof $master_host $master_port
wait_for_condition 50 100 {
[string match {*master_link_status:up*} [r info replication]]
} else {
fail "Can't turn the instance into a replica"
}
test {Test master link up hook} {
assert_equal [r hooks.event_count masterlink-up] 1
assert_equal [r hooks.event_count masterlink-down] 0
}
test {Test role-replica hook} {
assert_equal [r hooks.event_count role-replica] 1
assert_equal [r hooks.event_count role-master] 0
assert_equal [r hooks.event_last role-replica] [s 0 master_host]
}
test {Test replica-online hook} {
assert_equal [r -1 hooks.event_count replica-online] 1
assert_equal [r -1 hooks.event_count replica-offline] 0
}
test {Test master link down hook} {
r client kill type master
assert_equal [r hooks.event_count masterlink-down] 1
}
$replica replicaof no one
test {Test role-master hook} {
assert_equal [r hooks.event_count role-replica] 1
assert_equal [r hooks.event_count role-master] 1
assert_equal [r hooks.event_last role-master] {}
}
test {Test replica-offline hook} {
assert_equal [r -1 hooks.event_count replica-online] 1
assert_equal [r -1 hooks.event_count replica-offline] 1
}
# get the replica stdout, to be used by the next test
set replica_stdout [srv 0 stdout]
}
# look into the log file of the server that just exited
test {Test shutdown hook} {
assert_equal [string match {*module-event-shutdown*} [exec tail -5 < $replica_stdout]] 1
}
} }
} }