Merge branch 'unstable' of github.com:/antirez/redis into unstable

This commit is contained in:
antirez 2019-11-06 17:44:42 +01:00
commit 0f026af185
27 changed files with 1406 additions and 161 deletions

View File

@ -21,4 +21,5 @@ $TCLSH tests/test_helper.tcl \
--single unit/moduleapi/propagate \
--single unit/moduleapi/hooks \
--single unit/moduleapi/misc \
--single unit/moduleapi/blockonkeys \
"${@}"

View File

@ -731,7 +731,7 @@ int loadAppendOnlyFile(char *filename) {
server.aof_state = AOF_OFF;
fakeClient = createAOFClient();
startLoadingFile(fp, filename);
startLoadingFile(fp, filename, RDBFLAGS_AOF_PREAMBLE);
/* Check if this AOF file has an RDB preamble. In that case we need to
* load the RDB file and later continue loading the AOF tail. */
@ -746,7 +746,7 @@ int loadAppendOnlyFile(char *filename) {
serverLog(LL_NOTICE,"Reading RDB preamble from AOF file...");
if (fseek(fp,0,SEEK_SET) == -1) goto readerr;
rioInitWithFile(&rdb,fp);
if (rdbLoadRio(&rdb,NULL,1) != C_OK) {
if (rdbLoadRio(&rdb,RDBFLAGS_AOF_PREAMBLE,NULL) != C_OK) {
serverLog(LL_WARNING,"Error reading the RDB preamble of the AOF file, AOF loading aborted");
goto readerr;
} else {
@ -767,6 +767,7 @@ int loadAppendOnlyFile(char *filename) {
if (!(loops++ % 1000)) {
loadingProgress(ftello(fp));
processEventsWhileBlocked();
processModuleLoadingProgressEvent(1);
}
if (fgets(buf,sizeof(buf),fp) == NULL) {
@ -859,7 +860,7 @@ loaded_ok: /* DB loaded, cleanup and return C_OK to the caller. */
fclose(fp);
freeFakeClient(fakeClient);
server.aof_state = old_aof_state;
stopLoading();
stopLoading(1);
aofUpdateCurrentSize();
server.aof_rewrite_base_size = server.aof_current_size;
server.aof_fsync_offset = server.aof_current_size;
@ -1400,9 +1401,11 @@ int rewriteAppendOnlyFile(char *filename) {
if (server.aof_rewrite_incremental_fsync)
rioSetAutoSync(&aof,REDIS_AUTOSYNC_BYTES);
startSaving(RDBFLAGS_AOF_PREAMBLE);
if (server.aof_use_rdb_preamble) {
int error;
if (rdbSaveRio(&aof,&error,RDB_SAVE_AOF_PREAMBLE,NULL) == C_ERR) {
if (rdbSaveRio(&aof,&error,RDBFLAGS_AOF_PREAMBLE,NULL) == C_ERR) {
errno = error;
goto werr;
}
@ -1465,15 +1468,18 @@ int rewriteAppendOnlyFile(char *filename) {
if (rename(tmpfile,filename) == -1) {
serverLog(LL_WARNING,"Error moving temp append only file on the final destination: %s", strerror(errno));
unlink(tmpfile);
stopSaving(0);
return C_ERR;
}
serverLog(LL_NOTICE,"SYNC append only file rewrite performed");
stopSaving(1);
return C_OK;
werr:
serverLog(LL_WARNING,"Write error writing append only file on disk: %s", strerror(errno));
fclose(fp);
unlink(tmpfile);
stopSaving(0);
return C_ERR;
}

View File

@ -151,9 +151,13 @@ robj *lookupKeyRead(redisDb *db, robj *key) {
*
* Returns the linked value object if the key exists or NULL if the key
* does not exist in the specified DB. */
robj *lookupKeyWrite(redisDb *db, robj *key) {
robj *lookupKeyWriteWithFlags(redisDb *db, robj *key, int flags) {
expireIfNeeded(db,key);
return lookupKey(db,key,LOOKUP_NONE);
return lookupKey(db,key,flags);
}
robj *lookupKeyWrite(redisDb *db, robj *key) {
return lookupKeyWriteWithFlags(db, key, LOOKUP_NONE);
}
robj *lookupKeyReadOrReply(client *c, robj *key, robj *reply) {
@ -461,6 +465,29 @@ int getFlushCommandFlags(client *c, int *flags) {
return C_OK;
}
/* Flushes the whole server data set. */
void flushAllDataAndResetRDB(int flags) {
server.dirty += emptyDb(-1,flags,NULL);
if (server.rdb_child_pid != -1) killRDBChild();
if (server.saveparamslen > 0) {
/* Normally rdbSave() will reset dirty, but we don't want this here
* as otherwise FLUSHALL will not be replicated nor put into the AOF. */
int saved_dirty = server.dirty;
rdbSaveInfo rsi, *rsiptr;
rsiptr = rdbPopulateSaveInfo(&rsi);
rdbSave(server.rdb_filename,rsiptr);
server.dirty = saved_dirty;
}
server.dirty++;
#if defined(USE_JEMALLOC)
/* jemalloc 5 doesn't release pages back to the OS when there's no traffic.
* for large databases, flushdb blocks for long anyway, so a bit more won't
* harm and this way the flush and purge will be synchroneus. */
if (!(flags & EMPTYDB_ASYNC))
jemalloc_purge();
#endif
}
/* FLUSHDB [ASYNC]
*
* Flushes the currently SELECTed Redis DB. */
@ -484,28 +511,9 @@ void flushdbCommand(client *c) {
* Flushes the whole server data set. */
void flushallCommand(client *c) {
int flags;
if (getFlushCommandFlags(c,&flags) == C_ERR) return;
server.dirty += emptyDb(-1,flags,NULL);
flushAllDataAndResetRDB(flags);
addReply(c,shared.ok);
if (server.rdb_child_pid != -1) killRDBChild();
if (server.saveparamslen > 0) {
/* Normally rdbSave() will reset dirty, but we don't want this here
* as otherwise FLUSHALL will not be replicated nor put into the AOF. */
int saved_dirty = server.dirty;
rdbSaveInfo rsi, *rsiptr;
rsiptr = rdbPopulateSaveInfo(&rsi);
rdbSave(server.rdb_filename,rsiptr);
server.dirty = saved_dirty;
}
server.dirty++;
#if defined(USE_JEMALLOC)
/* jemalloc 5 doesn't release pages back to the OS when there's no traffic.
* for large databases, flushdb blocks for long anyway, so a bit more won't
* harm and this way the flush and purge will be synchroneus. */
if (!(flags & EMPTYDB_ASYNC))
jemalloc_purge();
#endif
}
/* This command implements DEL and LAZYDEL. */

View File

@ -417,7 +417,7 @@ NULL
}
emptyDb(-1,EMPTYDB_NO_FLAGS,NULL);
protectClient(c);
int ret = rdbLoad(server.rdb_filename,NULL);
int ret = rdbLoad(server.rdb_filename,NULL,RDBFLAGS_NONE);
unprotectClient(c);
if (ret != C_OK) {
addReplyError(c,"Error trying to load the RDB dump");

View File

@ -1625,6 +1625,27 @@ int modulePopulateClientInfoStructure(void *ci, client *client, int structver) {
return REDISMODULE_OK;
}
/* This is an helper for moduleFireServerEvent() and other functions:
* It populates the replication info structure with the appropriate
* fields depending on the version provided. If the version is not valid
* then REDISMODULE_ERR is returned. Otherwise the function returns
* REDISMODULE_OK and the structure pointed by 'ri' gets populated. */
int modulePopulateReplicationInfoStructure(void *ri, int structver) {
if (structver != 1) return REDISMODULE_ERR;
RedisModuleReplicationInfoV1 *ri1 = ri;
memset(ri1,0,sizeof(*ri1));
ri1->version = structver;
ri1->master = server.masterhost==NULL;
ri1->masterhost = server.masterhost? server.masterhost: "";
ri1->masterport = server.masterport;
ri1->replid1 = server.replid;
ri1->replid2 = server.replid2;
ri1->repl1_offset = server.master_repl_offset;
ri1->repl2_offset = server.second_replid_offset;
return REDISMODULE_OK;
}
/* Return information about the client with the specified ID (that was
* previously obtained via the RedisModule_GetClientId() API). If the
* client exists, REDISMODULE_OK is returned, otherwise REDISMODULE_ERR
@ -1677,6 +1698,15 @@ int RM_GetClientInfoById(void *ci, uint64_t id) {
return modulePopulateClientInfoStructure(ci,client,structver);
}
/* Publish a message to subscribers (see PUBLISH command). */
int RM_PublishMessage(RedisModuleCtx *ctx, RedisModuleString *channel, RedisModuleString *message) {
UNUSED(ctx);
int receivers = pubsubPublishMessage(channel, message);
if (server.cluster_enabled)
clusterPropagatePublish(channel, message);
return receivers;
}
/* Return the currently selected DB. */
int RM_GetSelectedDb(RedisModuleCtx *ctx) {
return ctx->client->db->id;
@ -1835,11 +1865,12 @@ int RM_SelectDb(RedisModuleCtx *ctx, int newid) {
void *RM_OpenKey(RedisModuleCtx *ctx, robj *keyname, int mode) {
RedisModuleKey *kp;
robj *value;
int flags = mode & REDISMODULE_OPEN_KEY_NOTOUCH? LOOKUP_NOTOUCH: 0;
if (mode & REDISMODULE_WRITE) {
value = lookupKeyWrite(ctx->client->db,keyname);
value = lookupKeyWriteWithFlags(ctx->client->db,keyname, flags);
} else {
value = lookupKeyRead(ctx->client->db,keyname);
value = lookupKeyReadWithFlags(ctx->client->db,keyname, flags);
if (value == NULL) {
return NULL;
}
@ -1964,6 +1995,28 @@ int RM_SetExpire(RedisModuleKey *key, mstime_t expire) {
return REDISMODULE_OK;
}
/* Performs similar operation to FLUSHALL, and optionally start a new AOF file (if enabled)
* If restart_aof is true, you must make sure the command that triggered this call is not
* propagated to the AOF file.
* When async is set to true, db contents will be freed by a background thread. */
void RM_ResetDataset(int restart_aof, int async) {
if (restart_aof && server.aof_state != AOF_OFF) stopAppendOnly();
flushAllDataAndResetRDB(async? EMPTYDB_ASYNC: EMPTYDB_NO_FLAGS);
if (server.aof_enabled && restart_aof) restartAOFAfterSYNC();
}
/* Returns the number of keys in the current db. */
unsigned long long RM_DbSize(RedisModuleCtx *ctx) {
return dictSize(ctx->client->db->dict);
}
/* Returns a name of a random key, or NULL if current db is empty. */
RedisModuleString *RM_RandomKey(RedisModuleCtx *ctx) {
robj *key = dbRandomKey(ctx->client->db);
autoMemoryAdd(ctx,REDISMODULE_AM_STRING,key);
return key;
}
/* --------------------------------------------------------------------------
* Key API for String type
* -------------------------------------------------------------------------- */
@ -3716,6 +3769,31 @@ loaderr:
return 0;
}
/* In the context of the rdb_save method of a module data type, saves a long double
* value to the RDB file. The double can be a valid number, a NaN or infinity.
* It is possible to load back the value with RedisModule_LoadLongDouble(). */
void RM_SaveLongDouble(RedisModuleIO *io, long double value) {
if (io->error) return;
char buf[MAX_LONG_DOUBLE_CHARS];
/* Long double has different number of bits in different platforms, so we
* save it as a string type. */
size_t len = ld2string(buf,sizeof(buf),value,LD_STR_HEX);
RM_SaveStringBuffer(io,buf,len+1); /* len+1 for '\0' */
}
/* In the context of the rdb_save method of a module data type, loads back the
* long double value saved by RedisModule_SaveLongDouble(). */
long double RM_LoadLongDouble(RedisModuleIO *io) {
if (io->error) return 0;
long double value;
size_t len;
char* str = RM_LoadStringBuffer(io,&len);
if (!str) return 0;
string2ld(str,len,&value);
RM_Free(str);
return value;
}
/* Iterate over modules, and trigger rdb aux saving for the ones modules types
* who asked for it. */
ssize_t rdbSaveModulesAux(rio *rdb, int when) {
@ -5949,8 +6027,8 @@ void ModuleForkDoneHandler(int exitcode, int bysignal) {
*
* The following sub events are available:
*
* REDISMODULE_EVENT_REPLROLECHANGED_NOW_MASTER
* REDISMODULE_EVENT_REPLROLECHANGED_NOW_REPLICA
* REDISMODULE_SUBEVENT_REPLROLECHANGED_NOW_MASTER
* REDISMODULE_SUBEVENT_REPLROLECHANGED_NOW_REPLICA
*
* The 'data' field can be casted by the callback to a
* RedisModuleReplicationInfo structure with the following fields:
@ -5960,24 +6038,30 @@ void ModuleForkDoneHandler(int exitcode, int bysignal) {
* int masterport; // master instance port for NOW_REPLICA
* char *replid1; // Main replication ID
* char *replid2; // Secondary replication ID
* uint64_t repl1_offset; // Main replication offset
* uint64_t repl2_offset; // Offset of replid2 validity
* uint64_t main_repl_offset; // Replication offset
*
* RedisModuleEvent_Persistence
*
* This event is called when RDB saving or AOF rewriting starts
* and ends. The following sub events are available:
*
* REDISMODULE_EVENT_LOADING_RDB_START // BGSAVE start
* REDISMODULE_EVENT_LOADING_RDB_END // BGSAVE end
* REDISMODULE_EVENT_LOADING_SYNC_RDB_START // SAVE start
* REDISMODULE_EVENT_LOADING_SYNC_RDB_START // SAVE end
* REDISMODULE_EVENT_LOADING_AOF_START // AOF rewrite start
* REDISMODULE_EVENT_LOADING_AOF_END // AOF rewrite end
* REDISMODULE_SUBEVENT_PERSISTENCE_RDB_START
* REDISMODULE_SUBEVENT_PERSISTENCE_AOF_START
* REDISMODULE_SUBEVENT_PERSISTENCE_SYNC_RDB_START
* REDISMODULE_SUBEVENT_PERSISTENCE_ENDED
* REDISMODULE_SUBEVENT_PERSISTENCE_FAILED
*
* The above events are triggered not just when the user calls the
* relevant commands like BGSAVE, but also when a saving operation
* or AOF rewriting occurs because of internal server triggers.
* The SYNC_RDB_START sub events are happening in the forground due to
* SAVE command, FLUSHALL, or server shutdown, and the other RDB and
* AOF sub events are executed in a background fork child, so any
* action the module takes can only affect the generated AOF or RDB,
* but will not be reflected in the parent process and affect connected
* clients and commands. Also note that the AOF_START sub event may end
* up saving RDB content in case of an AOF with rdb-preamble.
*
* RedisModuleEvent_FlushDB
*
@ -5985,8 +6069,8 @@ void ModuleForkDoneHandler(int exitcode, int bysignal) {
* because of replication, after the replica synchronization)
* happened. The following sub events are available:
*
* REDISMODULE_EVENT_FLUSHDB_START
* REDISMODULE_EVENT_FLUSHDB_END
* REDISMODULE_SUBEVENT_FLUSHDB_START
* REDISMODULE_SUBEVENT_FLUSHDB_END
*
* The data pointer can be casted to a RedisModuleFlushInfo
* structure with the following fields:
@ -6010,12 +6094,15 @@ void ModuleForkDoneHandler(int exitcode, int bysignal) {
* replica is loading the RDB file from the master.
* The following sub events are available:
*
* REDISMODULE_EVENT_LOADING_RDB_START
* REDISMODULE_EVENT_LOADING_RDB_END
* REDISMODULE_EVENT_LOADING_MASTER_RDB_START
* REDISMODULE_EVENT_LOADING_MASTER_RDB_END
* REDISMODULE_EVENT_LOADING_AOF_START
* REDISMODULE_EVENT_LOADING_AOF_END
* REDISMODULE_SUBEVENT_LOADING_RDB_START
* REDISMODULE_SUBEVENT_LOADING_AOF_START
* REDISMODULE_SUBEVENT_LOADING_REPL_START
* REDISMODULE_SUBEVENT_LOADING_ENDED
* REDISMODULE_SUBEVENT_LOADING_FAILED
*
* Note that AOF loading may start with an RDB data in case of
* rdb-preamble, in which case you'll only recieve an AOF_START event.
*
*
* RedisModuleEvent_ClientChange
*
@ -6024,8 +6111,8 @@ void ModuleForkDoneHandler(int exitcode, int bysignal) {
* structure, documented in RedisModule_GetClientInfoById().
* The following sub events are available:
*
* REDISMODULE_EVENT_CLIENT_CHANGE_CONNECTED
* REDISMODULE_EVENT_CLIENT_CHANGE_DISCONNECTED
* REDISMODULE_SUBEVENT_CLIENT_CHANGE_CONNECTED
* REDISMODULE_SUBEVENT_CLIENT_CHANGE_DISCONNECTED
*
* RedisModuleEvent_Shutdown
*
@ -6038,8 +6125,8 @@ void ModuleForkDoneHandler(int exitcode, int bysignal) {
* replica since it gets disconnected.
* The following sub events are availble:
*
* REDISMODULE_EVENT_REPLICA_CHANGE_ONLINE
* REDISMODULE_EVENT_REPLICA_CHANGE_OFFLINE
* REDISMODULE_SUBEVENT_REPLICA_CHANGE_ONLINE
* REDISMODULE_SUBEVENT_REPLICA_CHANGE_OFFLINE
*
* No additional information is available so far: future versions
* of Redis will have an API in order to enumerate the replicas
@ -6054,6 +6141,11 @@ void ModuleForkDoneHandler(int exitcode, int bysignal) {
* this changes depending on the "hz" configuration.
* No sub events are available.
*
* The data pointer can be casted to a RedisModuleCronLoop
* structure with the following fields:
*
* int32_t hz; // Approximate number of events per second.
*
* RedisModuleEvent_MasterLinkChange
*
* This is called for replicas in order to notify when the
@ -6063,8 +6155,38 @@ void ModuleForkDoneHandler(int exitcode, int bysignal) {
* replication is happening correctly.
* The following sub events are available:
*
* REDISMODULE_EVENT_MASTER_LINK_UP
* REDISMODULE_EVENT_MASTER_LINK_DOWN
* REDISMODULE_SUBEVENT_MASTER_LINK_UP
* REDISMODULE_SUBEVENT_MASTER_LINK_DOWN
*
* RedisModuleEvent_ModuleChange
*
* This event is called when a new module is loaded or one is unloaded.
* The following sub events are availble:
*
* REDISMODULE_SUBEVENT_MODULE_LOADED
* REDISMODULE_SUBEVENT_MODULE_UNLOADED
*
* The data pointer can be casted to a RedisModuleModuleChange
* structure with the following fields:
*
* const char* module_name; // Name of module loaded or unloaded.
* int32_t module_version; // Module version.
*
* RedisModuleEvent_LoadingProgress
*
* This event is called repeatedly called while an RDB or AOF file
* is being loaded.
* The following sub events are availble:
*
* REDISMODULE_SUBEVENT_LOADING_PROGRESS_RDB
* REDISMODULE_SUBEVENT_LOADING_PROGRESS_AOF
*
* The data pointer can be casted to a RedisModuleLoadingProgress
* structure with the following fields:
*
* int32_t hz; // Approximate number of events per second.
* int32_t progress; // Approximate progress between 0 and 1024,
* or -1 if unknown.
*
* The function returns REDISMODULE_OK if the module was successfully subscrived
* for the specified event. If the API is called from a wrong context then
@ -6123,7 +6245,7 @@ void moduleFireServerEvent(uint64_t eid, int subid, void *data) {
listRewind(RedisModule_EventListeners,&li);
while((ln = listNext(&li))) {
RedisModuleEventListener *el = ln->value;
if (el->event.id == eid && !el->module->in_hook) {
if (el->event.id == eid) {
RedisModuleCtx ctx = REDISMODULE_CTX_INIT;
ctx.module = el->module;
@ -6137,6 +6259,8 @@ void moduleFireServerEvent(uint64_t eid, int subid, void *data) {
void *moduledata = NULL;
RedisModuleClientInfoV1 civ1;
RedisModuleReplicationInfoV1 riv1;
RedisModuleModuleChangeV1 mcv1;
/* Start at DB zero by default when calling the handler. It's
* up to the specific event setup to change it when it makes
* sense. For instance for FLUSHDB events we select the correct
@ -6148,11 +6272,26 @@ void moduleFireServerEvent(uint64_t eid, int subid, void *data) {
modulePopulateClientInfoStructure(&civ1,data,
el->event.dataver);
moduledata = &civ1;
} else if (eid == REDISMODULE_EVENT_REPLICATION_ROLE_CHANGED) {
modulePopulateReplicationInfoStructure(&riv1,el->event.dataver);
moduledata = &riv1;
} else if (eid == REDISMODULE_EVENT_FLUSHDB) {
moduledata = data;
RedisModuleFlushInfoV1 *fi = data;
if (fi->dbnum != -1)
selectDb(ctx.client, fi->dbnum);
} else if (eid == REDISMODULE_EVENT_MODULE_CHANGE) {
RedisModule *m = data;
if (m == el->module)
continue;
mcv1.version = REDISMODULE_MODULE_CHANGE_VERSION;
mcv1.module_name = m->name;
mcv1.module_version = m->ver;
moduledata = &mcv1;
} else if (eid == REDISMODULE_EVENT_LOADING_PROGRESS) {
moduledata = data;
} else if (eid == REDISMODULE_EVENT_CRON_LOOP) {
moduledata = data;
}
ModulesInHooks++;
@ -6184,6 +6323,27 @@ void moduleUnsubscribeAllServerEvents(RedisModule *module) {
}
}
void processModuleLoadingProgressEvent(int is_aof) {
long long now = ustime();
static long long next_event = 0;
if (now >= next_event) {
/* Fire the loading progress modules end event. */
int progress = -1;
if (server.loading_total_bytes)
progress = (server.loading_total_bytes<<10) / server.loading_total_bytes;
RedisModuleFlushInfoV1 fi = {REDISMODULE_LOADING_PROGRESS_VERSION,
server.hz,
progress};
moduleFireServerEvent(REDISMODULE_EVENT_LOADING_PROGRESS,
is_aof?
REDISMODULE_SUBEVENT_LOADING_PROGRESS_AOF:
REDISMODULE_SUBEVENT_LOADING_PROGRESS_RDB,
&fi);
/* decide when the next event should fire. */
next_event = now + 1000000 / server.hz;
}
}
/* --------------------------------------------------------------------------
* Modules API internals
* -------------------------------------------------------------------------- */
@ -6352,6 +6512,11 @@ int moduleLoad(const char *path, void **module_argv, int module_argc) {
ctx.module->blocked_clients = 0;
ctx.module->handle = handle;
serverLog(LL_NOTICE,"Module '%s' loaded from %s",ctx.module->name,path);
/* Fire the loaded modules event. */
moduleFireServerEvent(REDISMODULE_EVENT_MODULE_CHANGE,
REDISMODULE_SUBEVENT_MODULE_LOADED,
ctx.module);
moduleFreeContext(&ctx);
return C_OK;
}
@ -6414,6 +6579,11 @@ int moduleUnload(sds name) {
module->name, error);
}
/* Fire the unloaded modules event. */
moduleFireServerEvent(REDISMODULE_EVENT_MODULE_CHANGE,
REDISMODULE_SUBEVENT_MODULE_UNLOADED,
module);
/* Remove from list of modules. */
serverLog(LL_NOTICE,"Module %s unloaded",module->name);
dictDelete(modules,module->name);
@ -6566,6 +6736,38 @@ size_t moduleCount(void) {
return dictSize(modules);
}
/* Set the key LRU/LFU depending on server.maxmemory_policy.
* The lru_idle arg is idle time in seconds, and is only relevant if the
* eviction policy is LRU based.
* The lfu_freq arg is a logarithmic counter that provides an indication of
* the access frequencyonly (must be <= 255) and is only relevant if the
* eviction policy is LFU based.
* Either or both of them may be <0, in that case, nothing is set. */
/* return value is an indication if the lru field was updated or not. */
int RM_SetLRUOrLFU(RedisModuleKey *key, long long lfu_freq, long long lru_idle) {
if (!key->value)
return REDISMODULE_ERR;
if (objectSetLRUOrLFU(key->value, lfu_freq, lru_idle, lru_idle>=0 ? LRU_CLOCK() : 0))
return REDISMODULE_OK;
return REDISMODULE_ERR;
}
/* Gets the key LRU or LFU (depending on the current eviction policy).
* One will be set to the appropiate return value, and the other will be set to -1.
* see RedisModule_SetLRUOrLFU for units and ranges.
* return value is an indication of success. */
int RM_GetLRUOrLFU(RedisModuleKey *key, long long *lfu_freq, long long *lru_idle) {
*lru_idle = *lfu_freq = -1;
if (!key->value)
return REDISMODULE_ERR;
if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) {
*lfu_freq = LFUDecrAndReturn(key->value);
} else {
*lru_idle = estimateObjectIdleTime(key->value)/1000;
}
return REDISMODULE_OK;
}
/* Register all the APIs we export. Keep this function at the end of the
* file so that's easy to seek it to add new entries. */
void moduleRegisterCoreAPI(void) {
@ -6630,6 +6832,9 @@ void moduleRegisterCoreAPI(void) {
REGISTER_API(StringTruncate);
REGISTER_API(SetExpire);
REGISTER_API(GetExpire);
REGISTER_API(ResetDataset);
REGISTER_API(DbSize);
REGISTER_API(RandomKey);
REGISTER_API(ZsetAdd);
REGISTER_API(ZsetIncrby);
REGISTER_API(ZsetScore);
@ -6669,6 +6874,8 @@ void moduleRegisterCoreAPI(void) {
REGISTER_API(LoadDouble);
REGISTER_API(SaveFloat);
REGISTER_API(LoadFloat);
REGISTER_API(SaveLongDouble);
REGISTER_API(LoadLongDouble);
REGISTER_API(EmitAOF);
REGISTER_API(Log);
REGISTER_API(LogIOError);
@ -6757,7 +6964,10 @@ void moduleRegisterCoreAPI(void) {
REGISTER_API(InfoAddFieldLongLong);
REGISTER_API(InfoAddFieldULongLong);
REGISTER_API(GetClientInfoById);
REGISTER_API(PublishMessage);
REGISTER_API(SubscribeToServerEvent);
REGISTER_API(SetLRUOrLFU);
REGISTER_API(GetLRUOrLFU);
REGISTER_API(BlockClientOnKeys);
REGISTER_API(SignalKeyAsReady);
REGISTER_API(GetBlockedClientReadyKey);

View File

@ -530,7 +530,7 @@ void addReplyHumanLongDouble(client *c, long double d) {
decrRefCount(o);
} else {
char buf[MAX_LONG_DOUBLE_CHARS];
int len = ld2string(buf,sizeof(buf),d,1);
int len = ld2string(buf,sizeof(buf),d,LD_STR_HUMAN);
addReplyProto(c,",",1);
addReplyProto(c,buf,len);
addReplyProto(c,"\r\n",2);
@ -1118,6 +1118,11 @@ void freeClient(client *c) {
if (c->flags & CLIENT_SLAVE && listLength(server.slaves) == 0)
server.repl_no_slaves_since = server.unixtime;
refreshGoodSlavesCount();
/* Fire the replica change modules event. */
if (c->replstate == SLAVE_STATE_ONLINE)
moduleFireServerEvent(REDISMODULE_EVENT_REPLICA_CHANGE,
REDISMODULE_SUBEVENT_REPLICA_CHANGE_OFFLINE,
NULL);
}
/* Master/slave cleanup Case 2:

View File

@ -178,7 +178,7 @@ robj *createStringObjectFromLongLongForValue(long long value) {
* The 'humanfriendly' option is used for INCRBYFLOAT and HINCRBYFLOAT. */
robj *createStringObjectFromLongDouble(long double value, int humanfriendly) {
char buf[MAX_LONG_DOUBLE_CHARS];
int len = ld2string(buf,sizeof(buf),value,humanfriendly);
int len = ld2string(buf,sizeof(buf),value,humanfriendly? LD_STR_HUMAN: LD_STR_AUTO);
return createStringObject(buf,len);
}
@ -1209,12 +1209,13 @@ sds getMemoryDoctorReport(void) {
* The lru_idle and lru_clock args are only relevant if policy
* is MAXMEMORY_FLAG_LRU.
* Either or both of them may be <0, in that case, nothing is set. */
void objectSetLRUOrLFU(robj *val, long long lfu_freq, long long lru_idle,
int objectSetLRUOrLFU(robj *val, long long lfu_freq, long long lru_idle,
long long lru_clock) {
if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) {
if (lfu_freq >= 0) {
serverAssert(lfu_freq <= 255);
val->lru = (LFUGetTimeInMinutes()<<8) | lfu_freq;
return 1;
}
} else if (lru_idle >= 0) {
/* Provided LRU idle time is in seconds. Scale
@ -1231,7 +1232,9 @@ void objectSetLRUOrLFU(robj *val, long long lfu_freq, long long lru_idle,
if (lru_abs < 0)
lru_abs = (lru_clock+(LRU_CLOCK_MAX/2)) % LRU_CLOCK_MAX;
val->lru = lru_abs;
return 1;
}
return 0;
}
/* ======================= The OBJECT and MEMORY commands =================== */

View File

@ -1080,9 +1080,9 @@ ssize_t rdbSaveAuxFieldStrInt(rio *rdb, char *key, long long val) {
}
/* Save a few default AUX fields with information about the RDB generated. */
int rdbSaveInfoAuxFields(rio *rdb, int flags, rdbSaveInfo *rsi) {
int rdbSaveInfoAuxFields(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
int redis_bits = (sizeof(void*) == 8) ? 64 : 32;
int aof_preamble = (flags & RDB_SAVE_AOF_PREAMBLE) != 0;
int aof_preamble = (rdbflags & RDBFLAGS_AOF_PREAMBLE) != 0;
/* Add a few fields about the state when the RDB was created. */
if (rdbSaveAuxFieldStrStr(rdb,"redis-ver",REDIS_VERSION) == -1) return -1;
@ -1150,7 +1150,7 @@ ssize_t rdbSaveSingleModuleAux(rio *rdb, int when, moduleType *mt) {
* When the function returns C_ERR and if 'error' is not NULL, the
* integer pointed by 'error' is set to the value of errno just after the I/O
* error. */
int rdbSaveRio(rio *rdb, int *error, int flags, rdbSaveInfo *rsi) {
int rdbSaveRio(rio *rdb, int *error, int rdbflags, rdbSaveInfo *rsi) {
dictIterator *di = NULL;
dictEntry *de;
char magic[10];
@ -1162,7 +1162,7 @@ int rdbSaveRio(rio *rdb, int *error, int flags, rdbSaveInfo *rsi) {
rdb->update_cksum = rioGenericUpdateChecksum;
snprintf(magic,sizeof(magic),"REDIS%04d",RDB_VERSION);
if (rdbWriteRaw(rdb,magic,9) == -1) goto werr;
if (rdbSaveInfoAuxFields(rdb,flags,rsi) == -1) goto werr;
if (rdbSaveInfoAuxFields(rdb,rdbflags,rsi) == -1) goto werr;
if (rdbSaveModulesAux(rdb, REDISMODULE_AUX_BEFORE_RDB) == -1) goto werr;
for (j = 0; j < server.dbnum; j++) {
@ -1199,7 +1199,7 @@ int rdbSaveRio(rio *rdb, int *error, int flags, rdbSaveInfo *rsi) {
/* When this RDB is produced as part of an AOF rewrite, move
* accumulated diff from parent to child while rewriting in
* order to have a smaller final write. */
if (flags & RDB_SAVE_AOF_PREAMBLE &&
if (rdbflags & RDBFLAGS_AOF_PREAMBLE &&
rdb->processed_bytes > processed+AOF_READ_DIFF_INTERVAL_BYTES)
{
processed = rdb->processed_bytes;
@ -1254,18 +1254,21 @@ werr:
int rdbSaveRioWithEOFMark(rio *rdb, int *error, rdbSaveInfo *rsi) {
char eofmark[RDB_EOF_MARK_SIZE];
startSaving(RDBFLAGS_REPLICATION);
getRandomHexChars(eofmark,RDB_EOF_MARK_SIZE);
if (error) *error = 0;
if (rioWrite(rdb,"$EOF:",5) == 0) goto werr;
if (rioWrite(rdb,eofmark,RDB_EOF_MARK_SIZE) == 0) goto werr;
if (rioWrite(rdb,"\r\n",2) == 0) goto werr;
if (rdbSaveRio(rdb,error,RDB_SAVE_NONE,rsi) == C_ERR) goto werr;
if (rdbSaveRio(rdb,error,RDBFLAGS_NONE,rsi) == C_ERR) goto werr;
if (rioWrite(rdb,eofmark,RDB_EOF_MARK_SIZE) == 0) goto werr;
stopSaving(1);
return C_OK;
werr: /* Write error. */
/* Set 'error' only if not already set by rdbSaveRio() call. */
if (error && *error == 0) *error = errno;
stopSaving(0);
return C_ERR;
}
@ -1291,11 +1294,12 @@ int rdbSave(char *filename, rdbSaveInfo *rsi) {
}
rioInitWithFile(&rdb,fp);
startSaving(RDBFLAGS_NONE);
if (server.rdb_save_incremental_fsync)
rioSetAutoSync(&rdb,REDIS_AUTOSYNC_BYTES);
if (rdbSaveRio(&rdb,&error,RDB_SAVE_NONE,rsi) == C_ERR) {
if (rdbSaveRio(&rdb,&error,RDBFLAGS_NONE,rsi) == C_ERR) {
errno = error;
goto werr;
}
@ -1317,6 +1321,7 @@ int rdbSave(char *filename, rdbSaveInfo *rsi) {
cwdp ? cwdp : "unknown",
strerror(errno));
unlink(tmpfile);
stopSaving(0);
return C_ERR;
}
@ -1324,12 +1329,14 @@ int rdbSave(char *filename, rdbSaveInfo *rsi) {
server.dirty = 0;
server.lastsave = time(NULL);
server.lastbgsave_status = C_OK;
stopSaving(1);
return C_OK;
werr:
serverLog(LL_WARNING,"Write error saving DB on disk: %s", strerror(errno));
fclose(fp);
unlink(tmpfile);
stopSaving(0);
return C_ERR;
}
@ -1918,23 +1925,33 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key) {
/* Mark that we are loading in the global state and setup the fields
* needed to provide loading stats. */
void startLoading(size_t size) {
void startLoading(size_t size, int rdbflags) {
/* Load the DB */
server.loading = 1;
server.loading_start_time = time(NULL);
server.loading_loaded_bytes = 0;
server.loading_total_bytes = size;
/* Fire the loading modules start event. */
int subevent;
if (rdbflags & RDBFLAGS_AOF_PREAMBLE)
subevent = REDISMODULE_SUBEVENT_LOADING_AOF_START;
else if(rdbflags & RDBFLAGS_REPLICATION)
subevent = REDISMODULE_SUBEVENT_LOADING_REPL_START;
else
subevent = REDISMODULE_SUBEVENT_LOADING_RDB_START;
moduleFireServerEvent(REDISMODULE_EVENT_LOADING,subevent,NULL);
}
/* Mark that we are loading in the global state and setup the fields
* needed to provide loading stats.
* 'filename' is optional and used for rdb-check on error */
void startLoadingFile(FILE *fp, char* filename) {
void startLoadingFile(FILE *fp, char* filename, int rdbflags) {
struct stat sb;
if (fstat(fileno(fp), &sb) == -1)
sb.st_size = 0;
rdbFileBeingLoaded = filename;
startLoading(sb.st_size);
startLoading(sb.st_size, rdbflags);
}
/* Refresh the loading progress info */
@ -1945,9 +1962,37 @@ void loadingProgress(off_t pos) {
}
/* Loading finished */
void stopLoading(void) {
void stopLoading(int success) {
server.loading = 0;
rdbFileBeingLoaded = NULL;
/* Fire the loading modules end event. */
moduleFireServerEvent(REDISMODULE_EVENT_LOADING,
success?
REDISMODULE_SUBEVENT_LOADING_ENDED:
REDISMODULE_SUBEVENT_LOADING_FAILED,
NULL);
}
void startSaving(int rdbflags) {
/* Fire the persistence modules end event. */
int subevent;
if (rdbflags & RDBFLAGS_AOF_PREAMBLE)
subevent = REDISMODULE_SUBEVENT_PERSISTENCE_AOF_START;
else if (getpid()!=server.pid)
subevent = REDISMODULE_SUBEVENT_PERSISTENCE_RDB_START;
else
subevent = REDISMODULE_SUBEVENT_PERSISTENCE_SYNC_RDB_START;
moduleFireServerEvent(REDISMODULE_EVENT_PERSISTENCE,subevent,NULL);
}
void stopSaving(int success) {
/* Fire the persistence modules end event. */
moduleFireServerEvent(REDISMODULE_EVENT_PERSISTENCE,
success?
REDISMODULE_SUBEVENT_PERSISTENCE_ENDED:
REDISMODULE_SUBEVENT_PERSISTENCE_FAILED,
NULL);
}
/* Track loading progress in order to serve client's from time to time
@ -1966,12 +2011,13 @@ void rdbLoadProgressCallback(rio *r, const void *buf, size_t len) {
replicationSendNewlineToMaster();
loadingProgress(r->processed_bytes);
processEventsWhileBlocked();
processModuleLoadingProgressEvent(0);
}
}
/* Load an RDB file from the rio stream 'rdb'. On success C_OK is returned,
* otherwise C_ERR is returned and 'errno' is set accordingly. */
int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) {
int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
uint64_t dbid;
int type, rdbver;
redisDb *db = server.db+0;
@ -2182,7 +2228,7 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) {
* received from the master. In the latter case, the master is
* responsible for key expiry. If we would expire keys here, the
* snapshot taken by the master may not be reflected on the slave. */
if (server.masterhost == NULL && !loading_aof && expiretime != -1 && expiretime < now) {
if (server.masterhost == NULL && !(rdbflags&RDBFLAGS_AOF_PREAMBLE) && expiretime != -1 && expiretime < now) {
decrRefCount(key);
decrRefCount(val);
} else {
@ -2243,17 +2289,17 @@ eoferr:
*
* If you pass an 'rsi' structure initialied with RDB_SAVE_OPTION_INIT, the
* loading code will fiil the information fields in the structure. */
int rdbLoad(char *filename, rdbSaveInfo *rsi) {
int rdbLoad(char *filename, rdbSaveInfo *rsi, int rdbflags) {
FILE *fp;
rio rdb;
int retval;
if ((fp = fopen(filename,"r")) == NULL) return C_ERR;
startLoadingFile(fp, filename);
startLoadingFile(fp, filename,rdbflags);
rioInitWithFile(&rdb,fp);
retval = rdbLoadRio(&rdb,rsi,0);
retval = rdbLoadRio(&rdb,rdbflags,rsi);
fclose(fp);
stopLoading();
stopLoading(retval==C_OK);
return retval;
}

View File

@ -121,8 +121,10 @@
#define RDB_LOAD_PLAIN (1<<1)
#define RDB_LOAD_SDS (1<<2)
#define RDB_SAVE_NONE 0
#define RDB_SAVE_AOF_PREAMBLE (1<<0)
/* flags on the purpose of rdb save or load */
#define RDBFLAGS_NONE 0
#define RDBFLAGS_AOF_PREAMBLE (1<<0)
#define RDBFLAGS_REPLICATION (1<<1)
int rdbSaveType(rio *rdb, unsigned char type);
int rdbLoadType(rio *rdb);
@ -135,7 +137,7 @@ uint64_t rdbLoadLen(rio *rdb, int *isencoded);
int rdbLoadLenByRef(rio *rdb, int *isencoded, uint64_t *lenptr);
int rdbSaveObjectType(rio *rdb, robj *o);
int rdbLoadObjectType(rio *rdb);
int rdbLoad(char *filename, rdbSaveInfo *rsi);
int rdbLoad(char *filename, rdbSaveInfo *rsi, int rdbflags);
int rdbSaveBackground(char *filename, rdbSaveInfo *rsi);
int rdbSaveToSlavesSockets(rdbSaveInfo *rsi);
void rdbRemoveTempFile(pid_t childpid);
@ -154,7 +156,8 @@ int rdbSaveBinaryDoubleValue(rio *rdb, double val);
int rdbLoadBinaryDoubleValue(rio *rdb, double *val);
int rdbSaveBinaryFloatValue(rio *rdb, float val);
int rdbLoadBinaryFloatValue(rio *rdb, float *val);
int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof);
int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi);
int rdbSaveRio(rio *rdb, int *error, int rdbflags, rdbSaveInfo *rsi);
rdbSaveInfo *rdbPopulateSaveInfo(rdbSaveInfo *rsi);
#endif

View File

@ -202,7 +202,7 @@ int redis_check_rdb(char *rdbfilename, FILE *fp) {
}
expiretime = -1;
startLoadingFile(fp, rdbfilename);
startLoadingFile(fp, rdbfilename, RDBFLAGS_NONE);
while(1) {
robj *key, *val;
@ -316,7 +316,7 @@ int redis_check_rdb(char *rdbfilename, FILE *fp) {
}
if (closefile) fclose(fp);
stopLoading();
stopLoading(1);
return 0;
eoferr: /* unexpected end of file is handled here with a fatal exit */
@ -327,7 +327,7 @@ eoferr: /* unexpected end of file is handled here with a fatal exit */
}
err:
if (closefile) fclose(fp);
stopLoading();
stopLoading(0);
return 1;
}

View File

@ -18,6 +18,10 @@
#define REDISMODULE_READ (1<<0)
#define REDISMODULE_WRITE (1<<1)
/* RedisModule_OpenKey extra flags for the 'mode' argument.
* Avoid touching the LRU/LFU of the key when opened. */
#define REDISMODULE_OPEN_KEY_NOTOUCH (1<<16)
#define REDISMODULE_LIST_HEAD 0
#define REDISMODULE_LIST_TAIL 1
@ -181,6 +185,8 @@ typedef uint64_t RedisModuleTimerID;
#define REDISMODULE_EVENT_REPLICA_CHANGE 6
#define REDISMODULE_EVENT_MASTER_LINK_CHANGE 7
#define REDISMODULE_EVENT_CRON_LOOP 8
#define REDISMODULE_EVENT_MODULE_CHANGE 9
#define REDISMODULE_EVENT_LOADING_PROGRESS 10
typedef struct RedisModuleEvent {
uint64_t id; /* REDISMODULE_EVENT_... defines. */
@ -226,18 +232,28 @@ static const RedisModuleEvent
RedisModuleEvent_MasterLinkChange = {
REDISMODULE_EVENT_MASTER_LINK_CHANGE,
1
},
RedisModuleEvent_ModuleChange = {
REDISMODULE_EVENT_MODULE_CHANGE,
1
},
RedisModuleEvent_LoadingProgress = {
REDISMODULE_EVENT_LOADING_PROGRESS,
1
};
/* Those are values that are used for the 'subevent' callback argument. */
#define REDISMODULE_SUBEVENT_PERSISTENCE_RDB_START 0
#define REDISMODULE_SUBEVENT_PERSISTENCE_RDB_END 1
#define REDISMODULE_SUBEVENT_PERSISTENCE_AOF_START 2
#define REDISMODULE_SUBEVENT_PERSISTENCE_AOF_END 3
#define REDISMODULE_SUBEVENT_PERSISTENCE_AOF_START 1
#define REDISMODULE_SUBEVENT_PERSISTENCE_SYNC_RDB_START 2
#define REDISMODULE_SUBEVENT_PERSISTENCE_ENDED 3
#define REDISMODULE_SUBEVENT_PERSISTENCE_FAILED 4
#define REDISMODULE_SUBEVENT_LOADING_RDB_START 0
#define REDISMODULE_SUBEVENT_LOADING_RDB_END 1
#define REDISMODULE_SUBEVENT_LOADING_AOF_START 2
#define REDISMODULE_SUBEVENT_LOADING_AOF_END 3
#define REDISMODULE_SUBEVENT_LOADING_AOF_START 1
#define REDISMODULE_SUBEVENT_LOADING_REPL_START 2
#define REDISMODULE_SUBEVENT_LOADING_ENDED 3
#define REDISMODULE_SUBEVENT_LOADING_FAILED 4
#define REDISMODULE_SUBEVENT_CLIENT_CHANGE_CONNECTED 0
#define REDISMODULE_SUBEVENT_CLIENT_CHANGE_DISCONNECTED 1
@ -245,12 +261,21 @@ static const RedisModuleEvent
#define REDISMODULE_SUBEVENT_MASTER_LINK_UP 0
#define REDISMODULE_SUBEVENT_MASTER_LINK_DOWN 1
#define REDISMODULE_SUBEVENT_REPLICA_CHANGE_CONNECTED 0
#define REDISMODULE_SUBEVENT_REPLICA_CHANGE_DISCONNECTED 1
#define REDISMODULE_SUBEVENT_REPLICA_CHANGE_ONLINE 0
#define REDISMODULE_SUBEVENT_REPLICA_CHANGE_OFFLINE 1
#define REDISMODULE_EVENT_REPLROLECHANGED_NOW_MASTER 0
#define REDISMODULE_EVENT_REPLROLECHANGED_NOW_REPLICA 1
#define REDISMODULE_SUBEVENT_FLUSHDB_START 0
#define REDISMODULE_SUBEVENT_FLUSHDB_END 1
#define REDISMODULE_SUBEVENT_MODULE_LOADED 0
#define REDISMODULE_SUBEVENT_MODULE_UNLOADED 1
#define REDISMODULE_SUBEVENT_LOADING_PROGRESS_RDB 0
#define REDISMODULE_SUBEVENT_LOADING_PROGRESS_AOF 1
/* RedisModuleClientInfo flags. */
#define REDISMODULE_CLIENTINFO_FLAG_SSL (1<<0)
#define REDISMODULE_CLIENTINFO_FLAG_PUBSUB (1<<1)
@ -286,6 +311,22 @@ typedef struct RedisModuleClientInfo {
#define RedisModuleClientInfo RedisModuleClientInfoV1
#define REDISMODULE_REPLICATIONINFO_VERSION 1
typedef struct RedisModuleReplicationInfo {
uint64_t version; /* Not used since this structure is never passed
from the module to the core right now. Here
for future compatibility. */
int master; /* true if master, false if replica */
char *masterhost; /* master instance hostname for NOW_REPLICA */
int masterport; /* master instance port for NOW_REPLICA */
char *replid1; /* Main replication ID */
char *replid2; /* Secondary replication ID */
uint64_t repl1_offset; /* Main replication offset */
uint64_t repl2_offset; /* Offset of replid2 validity */
} RedisModuleReplicationInfoV1;
#define RedisModuleReplicationInfo RedisModuleReplicationInfoV1
#define REDISMODULE_FLUSHINFO_VERSION 1
typedef struct RedisModuleFlushInfo {
uint64_t version; /* Not used since this structure is never passed
@ -297,6 +338,39 @@ typedef struct RedisModuleFlushInfo {
#define RedisModuleFlushInfo RedisModuleFlushInfoV1
#define REDISMODULE_MODULE_CHANGE_VERSION 1
typedef struct RedisModuleModuleChange {
uint64_t version; /* Not used since this structure is never passed
from the module to the core right now. Here
for future compatibility. */
const char* module_name;/* Name of module loaded or unloaded. */
int32_t module_version; /* Module version. */
} RedisModuleModuleChangeV1;
#define RedisModuleModuleChange RedisModuleModuleChangeV1
#define REDISMODULE_CRON_LOOP_VERSION 1
typedef struct RedisModuleCronLoopInfo {
uint64_t version; /* Not used since this structure is never passed
from the module to the core right now. Here
for future compatibility. */
int32_t hz; /* Approximate number of events per second. */
} RedisModuleCronLoopV1;
#define RedisModuleCronLoop RedisModuleCronLoopV1
#define REDISMODULE_LOADING_PROGRESS_VERSION 1
typedef struct RedisModuleLoadingProgressInfo {
uint64_t version; /* Not used since this structure is never passed
from the module to the core right now. Here
for future compatibility. */
int32_t hz; /* Approximate number of events per second. */
int32_t progress; /* Approximate progress between 0 and 1024, or -1
* if unknown. */
} RedisModuleLoadingProgressV1;
#define RedisModuleLoadingProgress RedisModuleLoadingProgressV1
/* ------------------------- End of common defines ------------------------ */
#ifndef REDISMODULE_CORE
@ -416,6 +490,9 @@ char *REDISMODULE_API_FUNC(RedisModule_StringDMA)(RedisModuleKey *key, size_t *l
int REDISMODULE_API_FUNC(RedisModule_StringTruncate)(RedisModuleKey *key, size_t newlen);
mstime_t REDISMODULE_API_FUNC(RedisModule_GetExpire)(RedisModuleKey *key);
int REDISMODULE_API_FUNC(RedisModule_SetExpire)(RedisModuleKey *key, mstime_t expire);
void REDISMODULE_API_FUNC(RedisModule_ResetDataset)(int restart_aof, int async);
unsigned long long REDISMODULE_API_FUNC(RedisModule_DbSize)(RedisModuleCtx *ctx);
RedisModuleString *REDISMODULE_API_FUNC(RedisModule_RandomKey)(RedisModuleCtx *ctx);
int REDISMODULE_API_FUNC(RedisModule_ZsetAdd)(RedisModuleKey *key, double score, RedisModuleString *ele, int *flagsptr);
int REDISMODULE_API_FUNC(RedisModule_ZsetIncrby)(RedisModuleKey *key, double score, RedisModuleString *ele, int *flagsptr, double *newscore);
int REDISMODULE_API_FUNC(RedisModule_ZsetScore)(RedisModuleKey *key, RedisModuleString *ele, double *score);
@ -435,6 +512,7 @@ int REDISMODULE_API_FUNC(RedisModule_IsKeysPositionRequest)(RedisModuleCtx *ctx)
void REDISMODULE_API_FUNC(RedisModule_KeyAtPos)(RedisModuleCtx *ctx, int pos);
unsigned long long REDISMODULE_API_FUNC(RedisModule_GetClientId)(RedisModuleCtx *ctx);
int REDISMODULE_API_FUNC(RedisModule_GetClientInfoById)(void *ci, uint64_t id);
int REDISMODULE_API_FUNC(RedisModule_PublishMessage)(RedisModuleCtx *ctx, RedisModuleString *channel, RedisModuleString *message);
int REDISMODULE_API_FUNC(RedisModule_GetContextFlags)(RedisModuleCtx *ctx);
void *REDISMODULE_API_FUNC(RedisModule_PoolAlloc)(RedisModuleCtx *ctx, size_t bytes);
RedisModuleType *REDISMODULE_API_FUNC(RedisModule_CreateDataType)(RedisModuleCtx *ctx, const char *name, int encver, RedisModuleTypeMethods *typemethods);
@ -457,6 +535,8 @@ void REDISMODULE_API_FUNC(RedisModule_SaveDouble)(RedisModuleIO *io, double valu
double REDISMODULE_API_FUNC(RedisModule_LoadDouble)(RedisModuleIO *io);
void REDISMODULE_API_FUNC(RedisModule_SaveFloat)(RedisModuleIO *io, float value);
float REDISMODULE_API_FUNC(RedisModule_LoadFloat)(RedisModuleIO *io);
void REDISMODULE_API_FUNC(RedisModule_SaveLongDouble)(RedisModuleIO *io, long double value);
long double REDISMODULE_API_FUNC(RedisModule_LoadLongDouble)(RedisModuleIO *io);
void REDISMODULE_API_FUNC(RedisModule_Log)(RedisModuleCtx *ctx, const char *level, const char *fmt, ...);
void REDISMODULE_API_FUNC(RedisModule_LogIOError)(RedisModuleIO *io, const char *levelstr, const char *fmt, ...);
void REDISMODULE_API_FUNC(RedisModule__Assert)(const char *estr, const char *file, int line);
@ -503,6 +583,8 @@ int REDISMODULE_API_FUNC(RedisModule_InfoAddFieldDouble)(RedisModuleInfoCtx *ctx
int REDISMODULE_API_FUNC(RedisModule_InfoAddFieldLongLong)(RedisModuleInfoCtx *ctx, char *field, long long value);
int REDISMODULE_API_FUNC(RedisModule_InfoAddFieldULongLong)(RedisModuleInfoCtx *ctx, char *field, unsigned long long value);
int REDISMODULE_API_FUNC(RedisModule_SubscribeToServerEvent)(RedisModuleCtx *ctx, RedisModuleEvent event, RedisModuleEventCallback callback);
int REDISMODULE_API_FUNC(RedisModule_SetLRUOrLFU)(RedisModuleKey *key, long long lfu_freq, long long lru_idle);
int REDISMODULE_API_FUNC(RedisModule_GetLRUOrLFU)(RedisModuleKey *key, long long *lfu_freq, long long *lru_idle);
RedisModuleBlockedClient *REDISMODULE_API_FUNC(RedisModule_BlockClientOnKeys)(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(RedisModuleCtx*,void*), long long timeout_ms, RedisModuleString **keys, int numkeys, void *privdata);
void REDISMODULE_API_FUNC(RedisModule_SignalKeyAsReady)(RedisModuleCtx *ctx, RedisModuleString *key);
RedisModuleString *REDISMODULE_API_FUNC(RedisModule_GetBlockedClientReadyKey)(RedisModuleCtx *ctx);
@ -619,6 +701,9 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int
REDISMODULE_GET_API(StringTruncate);
REDISMODULE_GET_API(GetExpire);
REDISMODULE_GET_API(SetExpire);
REDISMODULE_GET_API(ResetDataset);
REDISMODULE_GET_API(DbSize);
REDISMODULE_GET_API(RandomKey);
REDISMODULE_GET_API(ZsetAdd);
REDISMODULE_GET_API(ZsetIncrby);
REDISMODULE_GET_API(ZsetScore);
@ -658,6 +743,8 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int
REDISMODULE_GET_API(LoadDouble);
REDISMODULE_GET_API(SaveFloat);
REDISMODULE_GET_API(LoadFloat);
REDISMODULE_GET_API(SaveLongDouble);
REDISMODULE_GET_API(LoadLongDouble);
REDISMODULE_GET_API(EmitAOF);
REDISMODULE_GET_API(Log);
REDISMODULE_GET_API(LogIOError);
@ -705,7 +792,10 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int
REDISMODULE_GET_API(InfoAddFieldLongLong);
REDISMODULE_GET_API(InfoAddFieldULongLong);
REDISMODULE_GET_API(GetClientInfoById);
REDISMODULE_GET_API(PublishMessage);
REDISMODULE_GET_API(SubscribeToServerEvent);
REDISMODULE_GET_API(SetLRUOrLFU);
REDISMODULE_GET_API(GetLRUOrLFU);
REDISMODULE_GET_API(BlockClientOnKeys);
REDISMODULE_GET_API(SignalKeyAsReady);
REDISMODULE_GET_API(GetBlockedClientReadyKey);

View File

@ -533,6 +533,12 @@ int masterTryPartialResynchronization(client *c) {
* has this state from the previous connection with the master. */
refreshGoodSlavesCount();
/* Fire the replica change modules event. */
moduleFireServerEvent(REDISMODULE_EVENT_REPLICA_CHANGE,
REDISMODULE_SUBEVENT_REPLICA_CHANGE_ONLINE,
NULL);
return C_OK; /* The caller can return, no full resync needed. */
need_full_resync:
@ -868,6 +874,10 @@ void putSlaveOnline(client *slave) {
return;
}
refreshGoodSlavesCount();
/* Fire the replica change modules event. */
moduleFireServerEvent(REDISMODULE_EVENT_REPLICA_CHANGE,
REDISMODULE_SUBEVENT_REPLICA_CHANGE_ONLINE,
NULL);
serverLog(LL_NOTICE,"Synchronization with replica %s succeeded",
replicationGetSlaveName(slave));
}
@ -1542,11 +1552,11 @@ void readSyncBulkPayload(connection *conn) {
* We'll restore it when the RDB is received. */
connBlock(conn);
connRecvTimeout(conn, server.repl_timeout*1000);
startLoading(server.repl_transfer_size);
startLoading(server.repl_transfer_size, RDBFLAGS_REPLICATION);
if (rdbLoadRio(&rdb,&rsi,0) != C_OK) {
if (rdbLoadRio(&rdb,RDBFLAGS_REPLICATION,&rsi) != C_OK) {
/* RDB loading failed. */
stopLoading();
stopLoading(0);
serverLog(LL_WARNING,
"Failed trying to load the MASTER synchronization DB "
"from socket");
@ -1567,7 +1577,7 @@ void readSyncBulkPayload(connection *conn) {
* gets promoted. */
return;
}
stopLoading();
stopLoading(1);
/* RDB loading succeeded if we reach this point. */
if (server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB) {
@ -1614,7 +1624,7 @@ void readSyncBulkPayload(connection *conn) {
cancelReplicationHandshake();
return;
}
if (rdbLoad(server.rdb_filename,&rsi) != C_OK) {
if (rdbLoad(server.rdb_filename,&rsi,RDBFLAGS_REPLICATION) != C_OK) {
serverLog(LL_WARNING,
"Failed trying to load the MASTER synchronization "
"DB from disk");
@ -1636,6 +1646,11 @@ void readSyncBulkPayload(connection *conn) {
server.repl_state = REPL_STATE_CONNECTED;
server.repl_down_since = 0;
/* Fire the master link modules event. */
moduleFireServerEvent(REDISMODULE_EVENT_MASTER_LINK_CHANGE,
REDISMODULE_SUBEVENT_MASTER_LINK_UP,
NULL);
/* After a full resynchroniziation we use the replication ID and
* offset of the master. The secondary ID / offset are cleared since
* we are starting a new history. */
@ -2314,12 +2329,31 @@ void replicationSetMaster(char *ip, int port) {
replicationDiscardCachedMaster();
replicationCacheMasterUsingMyself();
}
/* Fire the role change modules event. */
moduleFireServerEvent(REDISMODULE_EVENT_REPLICATION_ROLE_CHANGED,
REDISMODULE_EVENT_REPLROLECHANGED_NOW_REPLICA,
NULL);
/* Fire the master link modules event. */
if (server.repl_state == REPL_STATE_CONNECTED)
moduleFireServerEvent(REDISMODULE_EVENT_MASTER_LINK_CHANGE,
REDISMODULE_SUBEVENT_MASTER_LINK_DOWN,
NULL);
server.repl_state = REPL_STATE_CONNECT;
}
/* Cancel replication, setting the instance as a master itself. */
void replicationUnsetMaster(void) {
if (server.masterhost == NULL) return; /* Nothing to do. */
/* Fire the master link modules event. */
if (server.repl_state == REPL_STATE_CONNECTED)
moduleFireServerEvent(REDISMODULE_EVENT_MASTER_LINK_CHANGE,
REDISMODULE_SUBEVENT_MASTER_LINK_DOWN,
NULL);
sdsfree(server.masterhost);
server.masterhost = NULL;
/* When a slave is turned into a master, the current replication ID
@ -2348,11 +2382,22 @@ void replicationUnsetMaster(void) {
* starting from now. Otherwise the backlog will be freed after a
* failover if slaves do not connect immediately. */
server.repl_no_slaves_since = server.unixtime;
/* Fire the role change modules event. */
moduleFireServerEvent(REDISMODULE_EVENT_REPLICATION_ROLE_CHANGED,
REDISMODULE_EVENT_REPLROLECHANGED_NOW_MASTER,
NULL);
}
/* This function is called when the slave lose the connection with the
* master into an unexpected way. */
void replicationHandleMasterDisconnection(void) {
/* Fire the master link modules event. */
if (server.repl_state == REPL_STATE_CONNECTED)
moduleFireServerEvent(REDISMODULE_EVENT_MASTER_LINK_CHANGE,
REDISMODULE_SUBEVENT_MASTER_LINK_DOWN,
NULL);
server.master = NULL;
server.repl_state = REPL_STATE_CONNECT;
server.repl_down_since = server.unixtime;

View File

@ -2065,6 +2065,12 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
server.rdb_bgsave_scheduled = 0;
}
/* Fire the cron loop modules event. */
RedisModuleCronLoopV1 ei = {REDISMODULE_CRON_LOOP_VERSION,server.hz};
moduleFireServerEvent(REDISMODULE_EVENT_CRON_LOOP,
0,
&ei);
server.cronloops++;
return 1000/server.hz;
}
@ -3697,6 +3703,9 @@ int prepareForShutdown(int flags) {
}
}
/* Fire the shutdown modules event. */
moduleFireServerEvent(REDISMODULE_EVENT_SHUTDOWN,0,NULL);
/* Remove the pid file if possible and needed. */
if (server.daemonize || server.pidfile) {
serverLog(LL_NOTICE,"Removing the pid file.");
@ -4782,7 +4791,7 @@ void loadDataFromDisk(void) {
serverLog(LL_NOTICE,"DB loaded from append only file: %.3f seconds",(float)(ustime()-start)/1000000);
} else {
rdbSaveInfo rsi = RDB_SAVE_INFO_INIT;
if (rdbLoad(server.rdb_filename,&rsi) == C_OK) {
if (rdbLoad(server.rdb_filename,&rsi,RDBFLAGS_NONE) == C_OK) {
serverLog(LL_NOTICE,"DB loaded from disk: %.3f seconds",
(float)(ustime()-start)/1000000);

View File

@ -1605,6 +1605,7 @@ ssize_t rdbSaveModulesAux(rio *rdb, int when);
int moduleAllDatatypesHandleErrors();
sds modulesCollectInfo(sds info, sds section, int for_crash_report, int sections);
void moduleFireServerEvent(uint64_t eid, int subid, void *data);
void processModuleLoadingProgressEvent(int is_aof);
int moduleTryServeClientBlockedOnKey(client *c, robj *key);
void moduleUnblockClient(client *c);
int moduleClientIsBlockedOnKeys(client *c);
@ -1837,10 +1838,12 @@ void rdbPipeReadHandler(struct aeEventLoop *eventLoop, int fd, void *clientData,
void rdbPipeWriteHandlerConnRemoved(struct connection *conn);
/* Generic persistence functions */
void startLoadingFile(FILE* fp, char* filename);
void startLoading(size_t size);
void startLoadingFile(FILE* fp, char* filename, int rdbflags);
void startLoading(size_t size, int rdbflags);
void loadingProgress(off_t pos);
void stopLoading(void);
void stopLoading(int success);
void startSaving(int rdbflags);
void stopSaving(int success);
#define DISK_ERROR_TYPE_AOF 1 /* Don't accept writes: AOF errors. */
#define DISK_ERROR_TYPE_RDB 2 /* Don't accept writes: RDB errors. */
@ -1849,7 +1852,6 @@ int writeCommandsDeniedByDiskError(void);
/* RDB persistence */
#include "rdb.h"
int rdbSaveRio(rio *rdb, int *error, int flags, rdbSaveInfo *rsi);
void killRDBChild(void);
/* AOF persistence */
@ -1865,6 +1867,7 @@ void aofRewriteBufferReset(void);
unsigned long aofRewriteBufferSize(void);
ssize_t aofReadDiffFromParent(void);
void killAppendOnlyChild(void);
void restartAOFAfterSYNC();
/* Child info */
void openChildInfoPipe(void);
@ -2085,9 +2088,10 @@ robj *lookupKeyWrite(redisDb *db, robj *key);
robj *lookupKeyReadOrReply(client *c, robj *key, robj *reply);
robj *lookupKeyWriteOrReply(client *c, robj *key, robj *reply);
robj *lookupKeyReadWithFlags(redisDb *db, robj *key, int flags);
robj *lookupKeyWriteWithFlags(redisDb *db, robj *key, int flags);
robj *objectCommandLookup(client *c, robj *key);
robj *objectCommandLookupOrReply(client *c, robj *key, robj *reply);
void objectSetLRUOrLFU(robj *val, long long lfu_freq, long long lru_idle,
int objectSetLRUOrLFU(robj *val, long long lfu_freq, long long lru_idle,
long long lru_clock);
#define LOOKUP_NONE 0
#define LOOKUP_NOTOUCH (1<<0)
@ -2104,6 +2108,7 @@ robj *dbUnshareStringValue(redisDb *db, robj *key, robj *o);
#define EMPTYDB_ASYNC (1<<0) /* Reclaim memory in another thread. */
long long emptyDb(int dbnum, int flags, void(callback)(void*));
long long emptyDbGeneric(redisDb *dbarray, int dbnum, int flags, void(callback)(void*));
void flushAllDataAndResetRDB(int flags);
long long dbTotalServerKeyCount();
int selectDb(client *c, int id);

View File

@ -621,7 +621,7 @@ void hincrbyfloatCommand(client *c) {
}
char buf[MAX_LONG_DOUBLE_CHARS];
int len = ld2string(buf,sizeof(buf),value,1);
int len = ld2string(buf,sizeof(buf),value,LD_STR_HUMAN);
new = sdsnewlen(buf,len);
hashTypeSet(o,c->argv[2]->ptr,new,HASH_SET_TAKE_VALUE);
addReplyBulkCBuffer(c,buf,len);

View File

@ -173,9 +173,19 @@ int streamCompareID(streamID *a, streamID *b) {
* C_ERR if an ID was given via 'use_id', but adding it failed since the
* current top ID is greater or equal. */
int streamAppendItem(stream *s, robj **argv, int64_t numfields, streamID *added_id, streamID *use_id) {
/* If an ID was given, check that it's greater than the last entry ID
* or return an error. */
if (use_id && streamCompareID(use_id,&s->last_id) <= 0) return C_ERR;
/* Generate the new entry ID. */
streamID id;
if (use_id)
id = *use_id;
else
streamNextID(&s->last_id,&id);
/* Check that the new ID is greater than the last entry ID
* or return an error. Automatically generated IDs might
* overflow (and wrap-around) when incrementing the sequence
part. */
if (streamCompareID(&id,&s->last_id) <= 0) return C_ERR;
/* Add the new entry. */
raxIterator ri;
@ -192,13 +202,6 @@ int streamAppendItem(stream *s, robj **argv, int64_t numfields, streamID *added_
}
raxStop(&ri);
/* Generate the new entry ID. */
streamID id;
if (use_id)
id = *use_id;
else
streamNextID(&s->last_id,&id);
/* We have to add the key into the radix tree in lexicographic order,
* to do so we consider the ID as a single 128 bit number written in
* big endian, so that the most significant bytes are the first ones. */

View File

@ -510,15 +510,17 @@ int d2string(char *buf, size_t len, double value) {
return len;
}
/* Convert a long double into a string. If humanfriendly is non-zero
* it does not use exponential format and trims trailing zeroes at the end,
* however this results in loss of precision. Otherwise exp format is used
* and the output of snprintf() is not modified.
/* Create a string object from a long double.
* If mode is humanfriendly it does not use exponential format and trims trailing
* zeroes at the end (may result in loss of precision).
* If mode is default exp format is used and the output of snprintf()
* is not modified (may result in loss of precision).
* If mode is hex hexadecimal format is used (no loss of precision)
*
* The function returns the length of the string or zero if there was not
* enough buffer room to store it. */
int ld2string(char *buf, size_t len, long double value, int humanfriendly) {
size_t l;
int ld2string(char *buf, size_t len, long double value, ld2string_mode mode) {
size_t l = 0;
if (isinf(value)) {
/* Libc in odd systems (Hi Solaris!) will format infinite in a
@ -531,26 +533,36 @@ int ld2string(char *buf, size_t len, long double value, int humanfriendly) {
memcpy(buf,"-inf",4);
l = 4;
}
} else if (humanfriendly) {
/* We use 17 digits precision since with 128 bit floats that precision
* after rounding is able to represent most small decimal numbers in a
* way that is "non surprising" for the user (that is, most small
* decimal numbers will be represented in a way that when converted
* back into a string are exactly the same as what the user typed.) */
l = snprintf(buf,len,"%.17Lf", value);
if (l+1 > len) return 0; /* No room. */
/* Now remove trailing zeroes after the '.' */
if (strchr(buf,'.') != NULL) {
char *p = buf+l-1;
while(*p == '0') {
p--;
l--;
}
if (*p == '.') l--;
}
} else {
l = snprintf(buf,len,"%.17Lg", value);
if (l+1 > len) return 0; /* No room. */
switch (mode) {
case LD_STR_AUTO:
l = snprintf(buf,len,"%.17Lg",value);
if (l+1 > len) return 0; /* No room. */
break;
case LD_STR_HEX:
l = snprintf(buf,len,"%La",value);
if (l+1 > len) return 0; /* No room. */
break;
case LD_STR_HUMAN:
/* We use 17 digits precision since with 128 bit floats that precision
* after rounding is able to represent most small decimal numbers in a
* way that is "non surprising" for the user (that is, most small
* decimal numbers will be represented in a way that when converted
* back into a string are exactly the same as what the user typed.) */
l = snprintf(buf,len,"%.17Lf",value);
if (l+1 > len) return 0; /* No room. */
/* Now remove trailing zeroes after the '.' */
if (strchr(buf,'.') != NULL) {
char *p = buf+l-1;
while(*p == '0') {
p--;
l--;
}
if (*p == '.') l--;
}
break;
default: return 0; /* Invalid mode. */
}
}
buf[l] = '\0';
return l;

View File

@ -38,6 +38,13 @@
* This should be the size of the buffer given to ld2string */
#define MAX_LONG_DOUBLE_CHARS 5*1024
/* long double to string convertion options */
typedef enum {
LD_STR_AUTO, /* %.17Lg */
LD_STR_HUMAN, /* %.17Lf + Trimming of trailing zeros */
LD_STR_HEX /* %La */
} ld2string_mode;
int stringmatchlen(const char *p, int plen, const char *s, int slen, int nocase);
int stringmatch(const char *p, const char *s, int nocase);
int stringmatchlen_fuzz_test(void);
@ -49,7 +56,7 @@ int string2ll(const char *s, size_t slen, long long *value);
int string2l(const char *s, size_t slen, long *value);
int string2ld(const char *s, size_t slen, long double *dp);
int d2string(char *buf, size_t len, double value);
int ld2string(char *buf, size_t len, long double value, int humanfriendly);
int ld2string(char *buf, size_t len, long double value, ld2string_mode mode);
sds getAbsolutePath(char *filename);
unsigned long getTimeZone(void);
int pathIsBaseName(char *path);

View File

@ -18,7 +18,8 @@ TEST_MODULES = \
infotest.so \
propagate.so \
misc.so \
hooks.so
hooks.so \
blockonkeys.so
.PHONY: all

261
tests/modules/blockonkeys.c Normal file
View File

@ -0,0 +1,261 @@
#define REDISMODULE_EXPERIMENTAL_API
#include "redismodule.h"
#include <string.h>
#include <assert.h>
#include <unistd.h>
#define LIST_SIZE 1024
typedef struct {
long long list[LIST_SIZE];
long long length;
} fsl_t; /* Fixed-size list */
static RedisModuleType *fsltype = NULL;
fsl_t *fsl_type_create() {
fsl_t *o;
o = RedisModule_Alloc(sizeof(*o));
o->length = 0;
return o;
}
void fsl_type_free(fsl_t *o) {
RedisModule_Free(o);
}
/* ========================== "fsltype" type methods ======================= */
void *fsl_rdb_load(RedisModuleIO *rdb, int encver) {
if (encver != 0) {
return NULL;
}
fsl_t *fsl = fsl_type_create();
fsl->length = RedisModule_LoadUnsigned(rdb);
for (long long i = 0; i < fsl->length; i++)
fsl->list[i] = RedisModule_LoadSigned(rdb);
return fsl;
}
void fsl_rdb_save(RedisModuleIO *rdb, void *value) {
fsl_t *fsl = value;
RedisModule_SaveUnsigned(rdb,fsl->length);
for (long long i = 0; i < fsl->length; i++)
RedisModule_SaveSigned(rdb, fsl->list[i]);
}
void fsl_aofrw(RedisModuleIO *aof, RedisModuleString *key, void *value) {
fsl_t *fsl = value;
for (long long i = 0; i < fsl->length; i++)
RedisModule_EmitAOF(aof, "FSL.PUSH","sl", key, fsl->list[i]);
}
void fsl_free(void *value) {
fsl_type_free(value);
}
/* ========================== helper methods ======================= */
int get_fsl(RedisModuleCtx *ctx, RedisModuleString *keyname, int mode, int create, fsl_t **fsl, int reply_on_failure) {
RedisModuleKey *key = RedisModule_OpenKey(ctx, keyname, mode);
int type = RedisModule_KeyType(key);
if (type != REDISMODULE_KEYTYPE_EMPTY && RedisModule_ModuleTypeGetType(key) != fsltype) {
RedisModule_CloseKey(key);
if (reply_on_failure)
RedisModule_ReplyWithError(ctx, REDISMODULE_ERRORMSG_WRONGTYPE);
return 0;
}
/* Create an empty value object if the key is currently empty. */
if (type == REDISMODULE_KEYTYPE_EMPTY) {
if (!create) {
/* Key is empty but we cannot create */
RedisModule_CloseKey(key);
*fsl = NULL;
return 1;
}
*fsl = fsl_type_create();
RedisModule_ModuleTypeSetValue(key, fsltype, *fsl);
} else {
*fsl = RedisModule_ModuleTypeGetValue(key);
}
RedisModule_CloseKey(key);
return 1;
}
/* ========================== commands ======================= */
/* FSL.PUSH <key> <int> - Push an integer to the fixed-size list (to the right).
* It must be greater than the element in the head of the list. */
int fsl_push(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
if (argc != 3)
return RedisModule_WrongArity(ctx);
long long ele;
if (RedisModule_StringToLongLong(argv[2],&ele) != REDISMODULE_OK)
return RedisModule_ReplyWithError(ctx,"ERR invalid integer");
fsl_t *fsl;
if (!get_fsl(ctx, argv[1], REDISMODULE_WRITE, 1, &fsl, 1))
return REDISMODULE_OK;
if (fsl->length == LIST_SIZE)
return RedisModule_ReplyWithError(ctx,"ERR list is full");
if (fsl->length != 0 && fsl->list[fsl->length-1] >= ele)
return RedisModule_ReplyWithError(ctx,"ERR new element has to be greater than the head element");
fsl->list[fsl->length++] = ele;
if (fsl->length >= 2)
RedisModule_SignalKeyAsReady(ctx, argv[1]);
return RedisModule_ReplyWithSimpleString(ctx, "OK");
}
int bpop2_reply_callback(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
REDISMODULE_NOT_USED(argv);
REDISMODULE_NOT_USED(argc);
RedisModuleString *keyname = RedisModule_GetBlockedClientReadyKey(ctx);
fsl_t *fsl;
if (!get_fsl(ctx, keyname, REDISMODULE_READ, 0, &fsl, 0))
return REDISMODULE_ERR;
if (!fsl || fsl->length < 2)
return REDISMODULE_ERR;
RedisModule_ReplyWithArray(ctx, 2);
RedisModule_ReplyWithLongLong(ctx, fsl->list[--fsl->length]);
RedisModule_ReplyWithLongLong(ctx, fsl->list[--fsl->length]);
return REDISMODULE_OK;
}
int bpop2_timeout_callback(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
REDISMODULE_NOT_USED(argv);
REDISMODULE_NOT_USED(argc);
return RedisModule_ReplyWithSimpleString(ctx, "Request timedout");
}
/* FSL.BPOP2 <key> <timeout> - Block clients until list has two or more elements.
* When that happens, unblock client and pop the last two elements (from the right). */
int fsl_bpop2(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
if (argc != 3)
return RedisModule_WrongArity(ctx);
long long timeout;
if (RedisModule_StringToLongLong(argv[2],&timeout) != REDISMODULE_OK || timeout < 0)
return RedisModule_ReplyWithError(ctx,"ERR invalid timeout");
fsl_t *fsl;
if (!get_fsl(ctx, argv[1], REDISMODULE_READ, 0, &fsl, 1))
return REDISMODULE_OK;
if (!fsl || fsl->length < 2) {
/* Key is empty or has <2 elements, we must block */
RedisModule_BlockClientOnKeys(ctx, bpop2_reply_callback, bpop2_timeout_callback,
NULL, timeout, &argv[1], 1, NULL);
} else {
RedisModule_ReplyWithArray(ctx, 2);
RedisModule_ReplyWithLongLong(ctx, fsl->list[--fsl->length]);
RedisModule_ReplyWithLongLong(ctx, fsl->list[--fsl->length]);
}
return REDISMODULE_OK;
}
int bpopgt_reply_callback(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
REDISMODULE_NOT_USED(argv);
REDISMODULE_NOT_USED(argc);
RedisModuleString *keyname = RedisModule_GetBlockedClientReadyKey(ctx);
long long gt = (long long)RedisModule_GetBlockedClientPrivateData(ctx);
fsl_t *fsl;
if (!get_fsl(ctx, keyname, REDISMODULE_READ, 0, &fsl, 0))
return REDISMODULE_ERR;
if (!fsl || fsl->list[fsl->length-1] <= gt)
return REDISMODULE_ERR;
RedisModule_ReplyWithLongLong(ctx, fsl->list[--fsl->length]);
return REDISMODULE_OK;
}
int bpopgt_timeout_callback(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
REDISMODULE_NOT_USED(argv);
REDISMODULE_NOT_USED(argc);
return RedisModule_ReplyWithSimpleString(ctx, "Request timedout");
}
void bpopgt_free_privdata(RedisModuleCtx *ctx, void *privdata) {
/* Nothing to do because privdata is actually a 'long long',
* not a pointer to the heap */
REDISMODULE_NOT_USED(ctx);
REDISMODULE_NOT_USED(privdata);
}
/* FSL.BPOPGT <key> <gt> <timeout> - Block clients until list has an element greater than <gt>.
* When that happens, unblock client and pop the last element (from the right). */
int fsl_bpopgt(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
if (argc != 4)
return RedisModule_WrongArity(ctx);
long long gt;
if (RedisModule_StringToLongLong(argv[2],&gt) != REDISMODULE_OK)
return RedisModule_ReplyWithError(ctx,"ERR invalid integer");
long long timeout;
if (RedisModule_StringToLongLong(argv[3],&timeout) != REDISMODULE_OK || timeout < 0)
return RedisModule_ReplyWithError(ctx,"ERR invalid timeout");
fsl_t *fsl;
if (!get_fsl(ctx, argv[1], REDISMODULE_READ, 0, &fsl, 1))
return REDISMODULE_OK;
if (!fsl || fsl->list[fsl->length-1] <= gt) {
/* Key is empty or has <2 elements, we must block */
RedisModule_BlockClientOnKeys(ctx, bpopgt_reply_callback, bpopgt_timeout_callback,
bpopgt_free_privdata, timeout, &argv[1], 1, (void*)gt);
} else {
RedisModule_ReplyWithLongLong(ctx, fsl->list[--fsl->length]);
}
return REDISMODULE_OK;
}
int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
REDISMODULE_NOT_USED(argv);
REDISMODULE_NOT_USED(argc);
if (RedisModule_Init(ctx, "blockonkeys", 1, REDISMODULE_APIVER_1)== REDISMODULE_ERR)
return REDISMODULE_ERR;
RedisModuleTypeMethods tm = {
.version = REDISMODULE_TYPE_METHOD_VERSION,
.rdb_load = fsl_rdb_load,
.rdb_save = fsl_rdb_save,
.aof_rewrite = fsl_aofrw,
.mem_usage = NULL,
.free = fsl_free,
.digest = NULL
};
fsltype = RedisModule_CreateDataType(ctx, "fsltype_t", 0, &tm);
if (fsltype == NULL)
return REDISMODULE_ERR;
if (RedisModule_CreateCommand(ctx,"fsl.push",fsl_push,"",0,0,0) == REDISMODULE_ERR)
return REDISMODULE_ERR;
if (RedisModule_CreateCommand(ctx,"fsl.bpop2",fsl_bpop2,"",0,0,0) == REDISMODULE_ERR)
return REDISMODULE_ERR;
if (RedisModule_CreateCommand(ctx,"fsl.bpopgt",fsl_bpopgt,"",0,0,0) == REDISMODULE_ERR)
return REDISMODULE_ERR;
return REDISMODULE_OK;
}

View File

@ -30,36 +30,227 @@
* POSSIBILITY OF SUCH DAMAGE.
*/
#define REDISMODULE_EXPERIMENTAL_API
#include "redismodule.h"
#include <stdio.h>
#include <string.h>
/* We need to store events to be able to test and see what we got, and we can't
* store them in the key-space since that would mess up rdb loading (duplicates)
* and be lost of flushdb. */
RedisModuleDict *event_log = NULL;
typedef struct EventElement {
long count;
RedisModuleString *last_val_string;
long last_val_int;
} EventElement;
void LogStringEvent(RedisModuleCtx *ctx, const char* keyname, const char* data) {
EventElement *event = RedisModule_DictGetC(event_log, (void*)keyname, strlen(keyname), NULL);
if (!event) {
event = RedisModule_Alloc(sizeof(EventElement));
memset(event, 0, sizeof(EventElement));
RedisModule_DictSetC(event_log, (void*)keyname, strlen(keyname), event);
}
if (event->last_val_string) RedisModule_FreeString(ctx, event->last_val_string);
event->last_val_string = RedisModule_CreateString(ctx, data, strlen(data));
event->count++;
}
void LogNumericEvent(RedisModuleCtx *ctx, const char* keyname, long data) {
REDISMODULE_NOT_USED(ctx);
EventElement *event = RedisModule_DictGetC(event_log, (void*)keyname, strlen(keyname), NULL);
if (!event) {
event = RedisModule_Alloc(sizeof(EventElement));
memset(event, 0, sizeof(EventElement));
RedisModule_DictSetC(event_log, (void*)keyname, strlen(keyname), event);
}
event->last_val_int = data;
event->count++;
}
void FreeEvent(RedisModuleCtx *ctx, EventElement *event) {
if (event->last_val_string)
RedisModule_FreeString(ctx, event->last_val_string);
RedisModule_Free(event);
}
int cmdEventCount(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
{
if (argc != 2){
RedisModule_WrongArity(ctx);
return REDISMODULE_OK;
}
EventElement *event = RedisModule_DictGet(event_log, argv[1], NULL);
RedisModule_ReplyWithLongLong(ctx, event? event->count: 0);
return REDISMODULE_OK;
}
int cmdEventLast(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
{
if (argc != 2){
RedisModule_WrongArity(ctx);
return REDISMODULE_OK;
}
EventElement *event = RedisModule_DictGet(event_log, argv[1], NULL);
if (event && event->last_val_string)
RedisModule_ReplyWithString(ctx, event->last_val_string);
else if (event)
RedisModule_ReplyWithLongLong(ctx, event->last_val_int);
else
RedisModule_ReplyWithNull(ctx);
return REDISMODULE_OK;
}
void clearEvents(RedisModuleCtx *ctx)
{
RedisModuleString *key;
EventElement *event;
RedisModuleDictIter *iter = RedisModule_DictIteratorStart(event_log, "^", NULL);
while((key = RedisModule_DictNext(ctx, iter, (void**)&event)) != NULL) {
event->count = 0;
event->last_val_int = 0;
if (event->last_val_string) RedisModule_FreeString(ctx, event->last_val_string);
event->last_val_string = NULL;
RedisModule_DictDel(event_log, key, NULL);
RedisModule_Free(event);
}
RedisModule_DictIteratorStop(iter);
}
int cmdEventsClear(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
{
REDISMODULE_NOT_USED(argc);
REDISMODULE_NOT_USED(argv);
clearEvents(ctx);
return REDISMODULE_OK;
}
/* Client state change callback. */
void clientChangeCallback(RedisModuleCtx *ctx, RedisModuleEvent e, uint64_t sub, void *data)
{
REDISMODULE_NOT_USED(ctx);
REDISMODULE_NOT_USED(e);
RedisModuleClientInfo *ci = data;
char *keyname = (sub == REDISMODULE_SUBEVENT_CLIENT_CHANGE_CONNECTED) ?
"connected" : "disconnected";
RedisModuleCallReply *reply;
RedisModule_SelectDb(ctx,9);
reply = RedisModule_Call(ctx,"RPUSH","cl",keyname,(long)ci->id);
RedisModule_FreeCallReply(reply);
"client-connected" : "client-disconnected";
LogNumericEvent(ctx, keyname, ci->id);
}
void flushdbCallback(RedisModuleCtx *ctx, RedisModuleEvent e, uint64_t sub, void *data)
{
REDISMODULE_NOT_USED(ctx);
REDISMODULE_NOT_USED(e);
RedisModuleFlushInfo *fi = data;
char *keyname = (sub == REDISMODULE_SUBEVENT_FLUSHDB_START) ?
"flush-start" : "flush-end";
RedisModuleCallReply *reply;
RedisModule_SelectDb(ctx,9);
reply = RedisModule_Call(ctx,"RPUSH","cl",keyname,(long)fi->dbnum);
RedisModule_FreeCallReply(reply);
LogNumericEvent(ctx, keyname, fi->dbnum);
}
void roleChangeCallback(RedisModuleCtx *ctx, RedisModuleEvent e, uint64_t sub, void *data)
{
REDISMODULE_NOT_USED(e);
REDISMODULE_NOT_USED(data);
RedisModuleReplicationInfo *ri = data;
char *keyname = (sub == REDISMODULE_EVENT_REPLROLECHANGED_NOW_MASTER) ?
"role-master" : "role-replica";
LogStringEvent(ctx, keyname, ri->masterhost);
}
void replicationChangeCallback(RedisModuleCtx *ctx, RedisModuleEvent e, uint64_t sub, void *data)
{
REDISMODULE_NOT_USED(e);
REDISMODULE_NOT_USED(data);
char *keyname = (sub == REDISMODULE_SUBEVENT_REPLICA_CHANGE_ONLINE) ?
"replica-online" : "replica-offline";
LogNumericEvent(ctx, keyname, 0);
}
void rasterLinkChangeCallback(RedisModuleCtx *ctx, RedisModuleEvent e, uint64_t sub, void *data)
{
REDISMODULE_NOT_USED(e);
REDISMODULE_NOT_USED(data);
char *keyname = (sub == REDISMODULE_SUBEVENT_MASTER_LINK_UP) ?
"masterlink-up" : "masterlink-down";
LogNumericEvent(ctx, keyname, 0);
}
void persistenceCallback(RedisModuleCtx *ctx, RedisModuleEvent e, uint64_t sub, void *data)
{
REDISMODULE_NOT_USED(e);
REDISMODULE_NOT_USED(data);
char *keyname = NULL;
switch (sub) {
case REDISMODULE_SUBEVENT_PERSISTENCE_RDB_START: keyname = "persistence-rdb-start"; break;
case REDISMODULE_SUBEVENT_PERSISTENCE_AOF_START: keyname = "persistence-aof-start"; break;
case REDISMODULE_SUBEVENT_PERSISTENCE_SYNC_RDB_START: keyname = "persistence-syncrdb-start"; break;
case REDISMODULE_SUBEVENT_PERSISTENCE_ENDED: keyname = "persistence-end"; break;
case REDISMODULE_SUBEVENT_PERSISTENCE_FAILED: keyname = "persistence-failed"; break;
}
/* modifying the keyspace from the fork child is not an option, using log instead */
RedisModule_Log(ctx, "warning", "module-event-%s", keyname);
if (sub == REDISMODULE_SUBEVENT_PERSISTENCE_SYNC_RDB_START)
LogNumericEvent(ctx, keyname, 0);
}
void loadingCallback(RedisModuleCtx *ctx, RedisModuleEvent e, uint64_t sub, void *data)
{
REDISMODULE_NOT_USED(e);
REDISMODULE_NOT_USED(data);
char *keyname = NULL;
switch (sub) {
case REDISMODULE_SUBEVENT_LOADING_RDB_START: keyname = "loading-rdb-start"; break;
case REDISMODULE_SUBEVENT_LOADING_AOF_START: keyname = "loading-aof-start"; break;
case REDISMODULE_SUBEVENT_LOADING_REPL_START: keyname = "loading-repl-start"; break;
case REDISMODULE_SUBEVENT_LOADING_ENDED: keyname = "loading-end"; break;
case REDISMODULE_SUBEVENT_LOADING_FAILED: keyname = "loading-failed"; break;
}
LogNumericEvent(ctx, keyname, 0);
}
void loadingProgressCallback(RedisModuleCtx *ctx, RedisModuleEvent e, uint64_t sub, void *data)
{
REDISMODULE_NOT_USED(e);
RedisModuleLoadingProgress *ei = data;
char *keyname = (sub == REDISMODULE_SUBEVENT_LOADING_PROGRESS_RDB) ?
"loading-progress-rdb" : "loading-progress-aof";
LogNumericEvent(ctx, keyname, ei->progress);
}
void shutdownCallback(RedisModuleCtx *ctx, RedisModuleEvent e, uint64_t sub, void *data)
{
REDISMODULE_NOT_USED(e);
REDISMODULE_NOT_USED(data);
REDISMODULE_NOT_USED(sub);
RedisModule_Log(ctx, "warning", "module-event-%s", "shutdown");
}
void cronLoopCallback(RedisModuleCtx *ctx, RedisModuleEvent e, uint64_t sub, void *data)
{
REDISMODULE_NOT_USED(e);
REDISMODULE_NOT_USED(sub);
RedisModuleCronLoop *ei = data;
LogNumericEvent(ctx, "cron-loop", ei->hz);
}
void moduleChangeCallback(RedisModuleCtx *ctx, RedisModuleEvent e, uint64_t sub, void *data)
{
REDISMODULE_NOT_USED(e);
RedisModuleModuleChange *ei = data;
char *keyname = (sub == REDISMODULE_SUBEVENT_MODULE_LOADED) ?
"module-loaded" : "module-unloaded";
LogStringEvent(ctx, keyname, ei->module_name);
}
/* This function must be present on each Redis module. It is used in order to
@ -71,9 +262,50 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
if (RedisModule_Init(ctx,"testhook",1,REDISMODULE_APIVER_1)
== REDISMODULE_ERR) return REDISMODULE_ERR;
/* replication related hooks */
RedisModule_SubscribeToServerEvent(ctx,
RedisModuleEvent_ReplicationRoleChanged, roleChangeCallback);
RedisModule_SubscribeToServerEvent(ctx,
RedisModuleEvent_ReplicaChange, replicationChangeCallback);
RedisModule_SubscribeToServerEvent(ctx,
RedisModuleEvent_MasterLinkChange, rasterLinkChangeCallback);
/* persistence related hooks */
RedisModule_SubscribeToServerEvent(ctx,
RedisModuleEvent_Persistence, persistenceCallback);
RedisModule_SubscribeToServerEvent(ctx,
RedisModuleEvent_Loading, loadingCallback);
RedisModule_SubscribeToServerEvent(ctx,
RedisModuleEvent_LoadingProgress, loadingProgressCallback);
/* other hooks */
RedisModule_SubscribeToServerEvent(ctx,
RedisModuleEvent_ClientChange, clientChangeCallback);
RedisModule_SubscribeToServerEvent(ctx,
RedisModuleEvent_FlushDB, flushdbCallback);
RedisModule_SubscribeToServerEvent(ctx,
RedisModuleEvent_Shutdown, shutdownCallback);
RedisModule_SubscribeToServerEvent(ctx,
RedisModuleEvent_CronLoop, cronLoopCallback);
RedisModule_SubscribeToServerEvent(ctx,
RedisModuleEvent_ModuleChange, moduleChangeCallback);
event_log = RedisModule_CreateDict(ctx);
if (RedisModule_CreateCommand(ctx,"hooks.event_count", cmdEventCount,"",0,0,0) == REDISMODULE_ERR)
return REDISMODULE_ERR;
if (RedisModule_CreateCommand(ctx,"hooks.event_last", cmdEventLast,"",0,0,0) == REDISMODULE_ERR)
return REDISMODULE_ERR;
if (RedisModule_CreateCommand(ctx,"hooks.clear", cmdEventsClear,"",0,0,0) == REDISMODULE_ERR)
return REDISMODULE_ERR;
return REDISMODULE_OK;
}
int RedisModule_OnUnload(RedisModuleCtx *ctx) {
clearEvents(ctx);
RedisModule_FreeDict(ctx, event_log);
event_log = NULL;
return REDISMODULE_OK;
}

View File

@ -40,6 +40,65 @@ int test_call_info(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
return REDISMODULE_OK;
}
int test_flushall(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
{
REDISMODULE_NOT_USED(argv);
REDISMODULE_NOT_USED(argc);
RedisModule_ResetDataset(1, 0);
RedisModule_ReplyWithCString(ctx, "Ok");
return REDISMODULE_OK;
}
int test_dbsize(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
{
REDISMODULE_NOT_USED(argv);
REDISMODULE_NOT_USED(argc);
long long ll = RedisModule_DbSize(ctx);
RedisModule_ReplyWithLongLong(ctx, ll);
return REDISMODULE_OK;
}
int test_randomkey(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
{
REDISMODULE_NOT_USED(argv);
REDISMODULE_NOT_USED(argc);
RedisModuleString *str = RedisModule_RandomKey(ctx);
RedisModule_ReplyWithString(ctx, str);
RedisModule_FreeString(ctx, str);
return REDISMODULE_OK;
}
int test_getlru(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
{
if (argc<2) {
RedisModule_WrongArity(ctx);
return REDISMODULE_OK;
}
RedisModuleString *keyname = argv[1];
RedisModuleKey *key = RedisModule_OpenKey(ctx, keyname, REDISMODULE_READ|REDISMODULE_OPEN_KEY_NOTOUCH);
long long lru, lfu;
RedisModule_GetLRUOrLFU(key, &lfu, &lru);
RedisModule_ReplyWithLongLong(ctx, lru);
RedisModule_CloseKey(key);
return REDISMODULE_OK;
}
int test_setlru(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
{
if (argc<3) {
RedisModule_WrongArity(ctx);
return REDISMODULE_OK;
}
RedisModuleString *keyname = argv[1];
RedisModuleKey *key = RedisModule_OpenKey(ctx, keyname, REDISMODULE_WRITE|REDISMODULE_OPEN_KEY_NOTOUCH);
long long lru;
RedisModule_StringToLongLong(argv[2], &lru);
RedisModule_SetLRUOrLFU(key, -1, lru);
RedisModule_ReplyWithCString(ctx, "Ok");
RedisModule_CloseKey(key);
return REDISMODULE_OK;
}
int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
REDISMODULE_NOT_USED(argv);
REDISMODULE_NOT_USED(argc);
@ -50,6 +109,16 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
return REDISMODULE_ERR;
if (RedisModule_CreateCommand(ctx,"test.call_info", test_call_info,"",0,0,0) == REDISMODULE_ERR)
return REDISMODULE_ERR;
if (RedisModule_CreateCommand(ctx,"test.flushall", test_flushall,"",0,0,0) == REDISMODULE_ERR)
return REDISMODULE_ERR;
if (RedisModule_CreateCommand(ctx,"test.dbsize", test_dbsize,"",0,0,0) == REDISMODULE_ERR)
return REDISMODULE_ERR;
if (RedisModule_CreateCommand(ctx,"test.randomkey", test_randomkey,"",0,0,0) == REDISMODULE_ERR)
return REDISMODULE_ERR;
if (RedisModule_CreateCommand(ctx,"test.setlru", test_setlru,"",0,0,0) == REDISMODULE_ERR)
return REDISMODULE_ERR;
if (RedisModule_CreateCommand(ctx,"test.getlru", test_getlru,"",0,0,0) == REDISMODULE_ERR)
return REDISMODULE_ERR;
return REDISMODULE_OK;
}

View File

@ -15,11 +15,16 @@ RedisModuleString *after_str = NULL;
void *testrdb_type_load(RedisModuleIO *rdb, int encver) {
int count = RedisModule_LoadSigned(rdb);
RedisModuleString *str = RedisModule_LoadString(rdb);
float f = RedisModule_LoadFloat(rdb);
long double ld = RedisModule_LoadLongDouble(rdb);
if (RedisModule_IsIOError(rdb))
return NULL;
/* Using the values only after checking for io errors. */
assert(count==1);
assert(encver==1);
RedisModuleString *str = RedisModule_LoadString(rdb);
assert(f==1.5f);
assert(ld==0.333333333333333333L);
return str;
}
@ -27,6 +32,8 @@ void testrdb_type_save(RedisModuleIO *rdb, void *value) {
RedisModuleString *str = (RedisModuleString*)value;
RedisModule_SaveSigned(rdb, 1);
RedisModule_SaveString(rdb, str);
RedisModule_SaveFloat(rdb, 1.5);
RedisModule_SaveLongDouble(rdb, 0.333333333333333333L);
}
void testrdb_aux_save(RedisModuleIO *rdb, int when) {

View File

@ -0,0 +1,85 @@
set testmodule [file normalize tests/modules/blockonkeys.so]
start_server {tags {"modules"}} {
r module load $testmodule
test {Module client blocked on keys (no metadata): No block} {
r del k
r fsl.push k 33
r fsl.push k 34
r fsl.bpop2 k 0
} {34 33}
test {Module client blocked on keys (no metadata): Timeout} {
r del k
set rd [redis_deferring_client]
r fsl.push k 33
$rd fsl.bpop2 k 1
assert_equal {Request timedout} [$rd read]
}
test {Module client blocked on keys (no metadata): Blocked, case 1} {
r del k
set rd [redis_deferring_client]
r fsl.push k 33
$rd fsl.bpop2 k 0
r fsl.push k 34
assert_equal {34 33} [$rd read]
}
test {Module client blocked on keys (no metadata): Blocked, case 2} {
r del k
set rd [redis_deferring_client]
r fsl.push k 33
r fsl.push k 34
$rd fsl.bpop2 k 0
assert_equal {34 33} [$rd read]
}
test {Module client blocked on keys (with metadata): No block} {
r del k
r fsl.push k 34
r fsl.bpopgt k 30 0
} {34}
test {Module client blocked on keys (with metadata): Timeout} {
r del k
set rd [redis_deferring_client]
r fsl.push k 33
$rd fsl.bpopgt k 35 1
assert_equal {Request timedout} [$rd read]
}
test {Module client blocked on keys (with metadata): Blocked, case 1} {
r del k
set rd [redis_deferring_client]
r fsl.push k 33
$rd fsl.bpopgt k 33 0
r fsl.push k 34
assert_equal {34} [$rd read]
}
test {Module client blocked on keys (with metadata): Blocked, case 2} {
r del k
set rd [redis_deferring_client]
$rd fsl.bpopgt k 35 0
r fsl.push k 33
r fsl.push k 34
r fsl.push k 35
r fsl.push k 36
assert_equal {36} [$rd read]
}
test {Module client blocked on keys does not wake up on wrong type} {
r del k
set rd [redis_deferring_client]
$rd fsl.bpop2 k 0
r lpush k 12
r lpush k 13
r lpush k 14
r del k
r fsl.push k 33
r fsl.push k 34
assert_equal {34 33} [$rd read]
}
}

View File

@ -3,26 +3,138 @@ set testmodule [file normalize tests/modules/hooks.so]
tags "modules" {
start_server {} {
r module load $testmodule
r config set appendonly yes
test {Test clients connection / disconnection hooks} {
for {set j 0} {$j < 2} {incr j} {
set rd1 [redis_deferring_client]
$rd1 close
}
assert {[r llen connected] > 1}
assert {[r llen disconnected] > 1}
assert {[r hooks.event_count client-connected] > 1}
assert {[r hooks.event_count client-disconnected] > 1}
}
test {Test module cron hook} {
after 100
assert {[r hooks.event_count cron-loop] > 0}
set hz [r hooks.event_last cron-loop]
assert_equal $hz 10
}
test {Test module loaded / unloaded hooks} {
set othermodule [file normalize tests/modules/infotest.so]
r module load $othermodule
r module unload infotest
assert_equal [r hooks.event_last module-loaded] "infotest"
assert_equal [r hooks.event_last module-unloaded] "infotest"
}
test {Test module aofrw hook} {
r debug populate 1000 foo 10000 ;# 10mb worth of data
r config set rdbcompression no ;# rdb progress is only checked once in 2mb
r BGREWRITEAOF
waitForBgrewriteaof r
assert_equal [string match {*module-event-persistence-aof-start*} [exec tail -20 < [srv 0 stdout]]] 1
assert_equal [string match {*module-event-persistence-end*} [exec tail -20 < [srv 0 stdout]]] 1
}
test {Test module aof load and rdb/aof progress hooks} {
# create some aof tail (progress is checked only once in 1000 commands)
for {set j 0} {$j < 4000} {incr j} {
r set "bar$j" x
}
# set some configs that will cause many loading progress events during aof loading
r config set key-load-delay 1
r config set dynamic-hz no
r config set hz 500
r DEBUG LOADAOF
assert_equal [r hooks.event_last loading-aof-start] 0
assert_equal [r hooks.event_last loading-end] 0
assert {[r hooks.event_count loading-rdb-start] == 0}
assert {[r hooks.event_count loading-progress-rdb] >= 2} ;# comes from the preamble section
assert {[r hooks.event_count loading-progress-aof] >= 2}
}
# undo configs before next test
r config set dynamic-hz yes
r config set key-load-delay 0
test {Test module rdb save hook} {
# debug reload does: save, flush, load:
assert {[r hooks.event_count persistence-syncrdb-start] == 0}
assert {[r hooks.event_count loading-rdb-start] == 0}
r debug reload
assert {[r hooks.event_count persistence-syncrdb-start] == 1}
assert {[r hooks.event_count loading-rdb-start] == 1}
}
test {Test flushdb hooks} {
r flushall ;# Note: only the "end" RPUSH will survive
r select 1
r flushdb
r select 2
r flushdb
r select 9
assert {[r llen flush-start] == 2}
assert {[r llen flush-end] == 3}
assert {[r lrange flush-start 0 -1] eq {1 2}}
assert {[r lrange flush-end 0 -1] eq {-1 1 2}}
assert_equal [r hooks.event_last flush-start] 9
assert_equal [r hooks.event_last flush-end] 9
r flushall
assert_equal [r hooks.event_last flush-start] -1
assert_equal [r hooks.event_last flush-end] -1
}
# replication related tests
set master [srv 0 client]
set master_host [srv 0 host]
set master_port [srv 0 port]
start_server {} {
r module load $testmodule
set replica [srv 0 client]
set replica_host [srv 0 host]
set replica_port [srv 0 port]
$replica replicaof $master_host $master_port
wait_for_condition 50 100 {
[string match {*master_link_status:up*} [r info replication]]
} else {
fail "Can't turn the instance into a replica"
}
test {Test master link up hook} {
assert_equal [r hooks.event_count masterlink-up] 1
assert_equal [r hooks.event_count masterlink-down] 0
}
test {Test role-replica hook} {
assert_equal [r hooks.event_count role-replica] 1
assert_equal [r hooks.event_count role-master] 0
assert_equal [r hooks.event_last role-replica] [s 0 master_host]
}
test {Test replica-online hook} {
assert_equal [r -1 hooks.event_count replica-online] 1
assert_equal [r -1 hooks.event_count replica-offline] 0
}
test {Test master link down hook} {
r client kill type master
assert_equal [r hooks.event_count masterlink-down] 1
}
$replica replicaof no one
test {Test role-master hook} {
assert_equal [r hooks.event_count role-replica] 1
assert_equal [r hooks.event_count role-master] 1
assert_equal [r hooks.event_last role-master] {}
}
test {Test replica-offline hook} {
assert_equal [r -1 hooks.event_count replica-online] 1
assert_equal [r -1 hooks.event_count replica-offline] 1
}
# get the replica stdout, to be used by the next test
set replica_stdout [srv 0 stdout]
}
# look into the log file of the server that just exited
test {Test shutdown hook} {
assert_equal [string match {*module-event-shutdown*} [exec tail -5 < $replica_stdout]] 1
}
}
}

View File

@ -16,4 +16,23 @@ start_server {tags {"modules"}} {
assert { [string match "*cmdstat_module*" $info] }
}
test {test module db commands} {
r set x foo
set key [r test.randomkey]
assert_equal $key "x"
assert_equal [r test.dbsize] 1
r test.flushall
assert_equal [r test.dbsize] 0
}
test {test modle lru api} {
r set x foo
set lru [r test.getlru x]
assert { $lru <= 1 }
r test.setlru x 100
set idle [r object idletime x]
assert { $idle >= 100 }
set lru [r test.getlru x]
assert { $lru >= 100 }
}
}

View File

@ -79,6 +79,12 @@ start_server {
assert {[streamCompareID $id2 $id3] == -1}
}
test {XADD IDs correctly report an error when overflowing} {
r DEL mystream
r xadd mystream 18446744073709551615-18446744073709551615 a b
assert_error ERR* {r xadd mystream * c d}
}
test {XADD with MAXLEN option} {
r DEL mystream
for {set j 0} {$j < 1000} {incr j} {