Merge pull request #6532 from oranagra/rm_misc_commands
Module API for PUBLISH, FLUSHALL, RANDOMKEY, DBSIZE
This commit is contained in:
commit
327cb45ed5
44
src/db.c
44
src/db.c
@ -461,6 +461,29 @@ int getFlushCommandFlags(client *c, int *flags) {
|
|||||||
return C_OK;
|
return C_OK;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* Flushes the whole server data set. */
|
||||||
|
void flushAllDataAndResetRDB(int flags) {
|
||||||
|
server.dirty += emptyDb(-1,flags,NULL);
|
||||||
|
if (server.rdb_child_pid != -1) killRDBChild();
|
||||||
|
if (server.saveparamslen > 0) {
|
||||||
|
/* Normally rdbSave() will reset dirty, but we don't want this here
|
||||||
|
* as otherwise FLUSHALL will not be replicated nor put into the AOF. */
|
||||||
|
int saved_dirty = server.dirty;
|
||||||
|
rdbSaveInfo rsi, *rsiptr;
|
||||||
|
rsiptr = rdbPopulateSaveInfo(&rsi);
|
||||||
|
rdbSave(server.rdb_filename,rsiptr);
|
||||||
|
server.dirty = saved_dirty;
|
||||||
|
}
|
||||||
|
server.dirty++;
|
||||||
|
#if defined(USE_JEMALLOC)
|
||||||
|
/* jemalloc 5 doesn't release pages back to the OS when there's no traffic.
|
||||||
|
* for large databases, flushdb blocks for long anyway, so a bit more won't
|
||||||
|
* harm and this way the flush and purge will be synchroneus. */
|
||||||
|
if (!(flags & EMPTYDB_ASYNC))
|
||||||
|
jemalloc_purge();
|
||||||
|
#endif
|
||||||
|
}
|
||||||
|
|
||||||
/* FLUSHDB [ASYNC]
|
/* FLUSHDB [ASYNC]
|
||||||
*
|
*
|
||||||
* Flushes the currently SELECTed Redis DB. */
|
* Flushes the currently SELECTed Redis DB. */
|
||||||
@ -484,28 +507,9 @@ void flushdbCommand(client *c) {
|
|||||||
* Flushes the whole server data set. */
|
* Flushes the whole server data set. */
|
||||||
void flushallCommand(client *c) {
|
void flushallCommand(client *c) {
|
||||||
int flags;
|
int flags;
|
||||||
|
|
||||||
if (getFlushCommandFlags(c,&flags) == C_ERR) return;
|
if (getFlushCommandFlags(c,&flags) == C_ERR) return;
|
||||||
server.dirty += emptyDb(-1,flags,NULL);
|
flushAllDataAndResetRDB(flags);
|
||||||
addReply(c,shared.ok);
|
addReply(c,shared.ok);
|
||||||
if (server.rdb_child_pid != -1) killRDBChild();
|
|
||||||
if (server.saveparamslen > 0) {
|
|
||||||
/* Normally rdbSave() will reset dirty, but we don't want this here
|
|
||||||
* as otherwise FLUSHALL will not be replicated nor put into the AOF. */
|
|
||||||
int saved_dirty = server.dirty;
|
|
||||||
rdbSaveInfo rsi, *rsiptr;
|
|
||||||
rsiptr = rdbPopulateSaveInfo(&rsi);
|
|
||||||
rdbSave(server.rdb_filename,rsiptr);
|
|
||||||
server.dirty = saved_dirty;
|
|
||||||
}
|
|
||||||
server.dirty++;
|
|
||||||
#if defined(USE_JEMALLOC)
|
|
||||||
/* jemalloc 5 doesn't release pages back to the OS when there's no traffic.
|
|
||||||
* for large databases, flushdb blocks for long anyway, so a bit more won't
|
|
||||||
* harm and this way the flush and purge will be synchroneus. */
|
|
||||||
if (!(flags & EMPTYDB_ASYNC))
|
|
||||||
jemalloc_purge();
|
|
||||||
#endif
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/* This command implements DEL and LAZYDEL. */
|
/* This command implements DEL and LAZYDEL. */
|
||||||
|
35
src/module.c
35
src/module.c
@ -1698,6 +1698,15 @@ int RM_GetClientInfoById(void *ci, uint64_t id) {
|
|||||||
return modulePopulateClientInfoStructure(ci,client,structver);
|
return modulePopulateClientInfoStructure(ci,client,structver);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* Publish a message to subscribers (see PUBLISH command). */
|
||||||
|
int RM_PublishMessage(RedisModuleCtx *ctx, RedisModuleString *channel, RedisModuleString *message) {
|
||||||
|
UNUSED(ctx);
|
||||||
|
int receivers = pubsubPublishMessage(channel, message);
|
||||||
|
if (server.cluster_enabled)
|
||||||
|
clusterPropagatePublish(channel, message);
|
||||||
|
return receivers;
|
||||||
|
}
|
||||||
|
|
||||||
/* Return the currently selected DB. */
|
/* Return the currently selected DB. */
|
||||||
int RM_GetSelectedDb(RedisModuleCtx *ctx) {
|
int RM_GetSelectedDb(RedisModuleCtx *ctx) {
|
||||||
return ctx->client->db->id;
|
return ctx->client->db->id;
|
||||||
@ -1985,6 +1994,28 @@ int RM_SetExpire(RedisModuleKey *key, mstime_t expire) {
|
|||||||
return REDISMODULE_OK;
|
return REDISMODULE_OK;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* Performs similar operation to FLUSHALL, and optionally start a new AOF file (if enabled)
|
||||||
|
* If restart_aof is true, you must make sure the command that triggered this call is not
|
||||||
|
* propagated to the AOF file.
|
||||||
|
* When async is set to true, db contents will be freed by a background thread. */
|
||||||
|
void RM_ResetDataset(int restart_aof, int async) {
|
||||||
|
if (restart_aof && server.aof_state != AOF_OFF) stopAppendOnly();
|
||||||
|
flushAllDataAndResetRDB(async? EMPTYDB_ASYNC: EMPTYDB_NO_FLAGS);
|
||||||
|
if (server.aof_enabled && restart_aof) restartAOFAfterSYNC();
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Returns the number of keys in the current db. */
|
||||||
|
unsigned long long RM_DbSize(RedisModuleCtx *ctx) {
|
||||||
|
return dictSize(ctx->client->db->dict);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Returns a name of a random key, or NULL if current db is empty. */
|
||||||
|
RedisModuleString *RM_RandomKey(RedisModuleCtx *ctx) {
|
||||||
|
robj *key = dbRandomKey(ctx->client->db);
|
||||||
|
autoMemoryAdd(ctx,REDISMODULE_AM_STRING,key);
|
||||||
|
return key;
|
||||||
|
}
|
||||||
|
|
||||||
/* --------------------------------------------------------------------------
|
/* --------------------------------------------------------------------------
|
||||||
* Key API for String type
|
* Key API for String type
|
||||||
* -------------------------------------------------------------------------- */
|
* -------------------------------------------------------------------------- */
|
||||||
@ -6743,6 +6774,9 @@ void moduleRegisterCoreAPI(void) {
|
|||||||
REGISTER_API(StringTruncate);
|
REGISTER_API(StringTruncate);
|
||||||
REGISTER_API(SetExpire);
|
REGISTER_API(SetExpire);
|
||||||
REGISTER_API(GetExpire);
|
REGISTER_API(GetExpire);
|
||||||
|
REGISTER_API(ResetDataset);
|
||||||
|
REGISTER_API(DbSize);
|
||||||
|
REGISTER_API(RandomKey);
|
||||||
REGISTER_API(ZsetAdd);
|
REGISTER_API(ZsetAdd);
|
||||||
REGISTER_API(ZsetIncrby);
|
REGISTER_API(ZsetIncrby);
|
||||||
REGISTER_API(ZsetScore);
|
REGISTER_API(ZsetScore);
|
||||||
@ -6870,6 +6904,7 @@ void moduleRegisterCoreAPI(void) {
|
|||||||
REGISTER_API(InfoAddFieldLongLong);
|
REGISTER_API(InfoAddFieldLongLong);
|
||||||
REGISTER_API(InfoAddFieldULongLong);
|
REGISTER_API(InfoAddFieldULongLong);
|
||||||
REGISTER_API(GetClientInfoById);
|
REGISTER_API(GetClientInfoById);
|
||||||
|
REGISTER_API(PublishMessage);
|
||||||
REGISTER_API(SubscribeToServerEvent);
|
REGISTER_API(SubscribeToServerEvent);
|
||||||
REGISTER_API(BlockClientOnKeys);
|
REGISTER_API(BlockClientOnKeys);
|
||||||
REGISTER_API(SignalKeyAsReady);
|
REGISTER_API(SignalKeyAsReady);
|
||||||
|
@ -486,6 +486,9 @@ char *REDISMODULE_API_FUNC(RedisModule_StringDMA)(RedisModuleKey *key, size_t *l
|
|||||||
int REDISMODULE_API_FUNC(RedisModule_StringTruncate)(RedisModuleKey *key, size_t newlen);
|
int REDISMODULE_API_FUNC(RedisModule_StringTruncate)(RedisModuleKey *key, size_t newlen);
|
||||||
mstime_t REDISMODULE_API_FUNC(RedisModule_GetExpire)(RedisModuleKey *key);
|
mstime_t REDISMODULE_API_FUNC(RedisModule_GetExpire)(RedisModuleKey *key);
|
||||||
int REDISMODULE_API_FUNC(RedisModule_SetExpire)(RedisModuleKey *key, mstime_t expire);
|
int REDISMODULE_API_FUNC(RedisModule_SetExpire)(RedisModuleKey *key, mstime_t expire);
|
||||||
|
void REDISMODULE_API_FUNC(RedisModule_ResetDataset)(int restart_aof, int async);
|
||||||
|
unsigned long long REDISMODULE_API_FUNC(RedisModule_DbSize)(RedisModuleCtx *ctx);
|
||||||
|
RedisModuleString *REDISMODULE_API_FUNC(RedisModule_RandomKey)(RedisModuleCtx *ctx);
|
||||||
int REDISMODULE_API_FUNC(RedisModule_ZsetAdd)(RedisModuleKey *key, double score, RedisModuleString *ele, int *flagsptr);
|
int REDISMODULE_API_FUNC(RedisModule_ZsetAdd)(RedisModuleKey *key, double score, RedisModuleString *ele, int *flagsptr);
|
||||||
int REDISMODULE_API_FUNC(RedisModule_ZsetIncrby)(RedisModuleKey *key, double score, RedisModuleString *ele, int *flagsptr, double *newscore);
|
int REDISMODULE_API_FUNC(RedisModule_ZsetIncrby)(RedisModuleKey *key, double score, RedisModuleString *ele, int *flagsptr, double *newscore);
|
||||||
int REDISMODULE_API_FUNC(RedisModule_ZsetScore)(RedisModuleKey *key, RedisModuleString *ele, double *score);
|
int REDISMODULE_API_FUNC(RedisModule_ZsetScore)(RedisModuleKey *key, RedisModuleString *ele, double *score);
|
||||||
@ -505,6 +508,7 @@ int REDISMODULE_API_FUNC(RedisModule_IsKeysPositionRequest)(RedisModuleCtx *ctx)
|
|||||||
void REDISMODULE_API_FUNC(RedisModule_KeyAtPos)(RedisModuleCtx *ctx, int pos);
|
void REDISMODULE_API_FUNC(RedisModule_KeyAtPos)(RedisModuleCtx *ctx, int pos);
|
||||||
unsigned long long REDISMODULE_API_FUNC(RedisModule_GetClientId)(RedisModuleCtx *ctx);
|
unsigned long long REDISMODULE_API_FUNC(RedisModule_GetClientId)(RedisModuleCtx *ctx);
|
||||||
int REDISMODULE_API_FUNC(RedisModule_GetClientInfoById)(void *ci, uint64_t id);
|
int REDISMODULE_API_FUNC(RedisModule_GetClientInfoById)(void *ci, uint64_t id);
|
||||||
|
int REDISMODULE_API_FUNC(RedisModule_PublishMessage)(RedisModuleCtx *ctx, RedisModuleString *channel, RedisModuleString *message);
|
||||||
int REDISMODULE_API_FUNC(RedisModule_GetContextFlags)(RedisModuleCtx *ctx);
|
int REDISMODULE_API_FUNC(RedisModule_GetContextFlags)(RedisModuleCtx *ctx);
|
||||||
void *REDISMODULE_API_FUNC(RedisModule_PoolAlloc)(RedisModuleCtx *ctx, size_t bytes);
|
void *REDISMODULE_API_FUNC(RedisModule_PoolAlloc)(RedisModuleCtx *ctx, size_t bytes);
|
||||||
RedisModuleType *REDISMODULE_API_FUNC(RedisModule_CreateDataType)(RedisModuleCtx *ctx, const char *name, int encver, RedisModuleTypeMethods *typemethods);
|
RedisModuleType *REDISMODULE_API_FUNC(RedisModule_CreateDataType)(RedisModuleCtx *ctx, const char *name, int encver, RedisModuleTypeMethods *typemethods);
|
||||||
@ -689,6 +693,9 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int
|
|||||||
REDISMODULE_GET_API(StringTruncate);
|
REDISMODULE_GET_API(StringTruncate);
|
||||||
REDISMODULE_GET_API(GetExpire);
|
REDISMODULE_GET_API(GetExpire);
|
||||||
REDISMODULE_GET_API(SetExpire);
|
REDISMODULE_GET_API(SetExpire);
|
||||||
|
REDISMODULE_GET_API(ResetDataset);
|
||||||
|
REDISMODULE_GET_API(DbSize);
|
||||||
|
REDISMODULE_GET_API(RandomKey);
|
||||||
REDISMODULE_GET_API(ZsetAdd);
|
REDISMODULE_GET_API(ZsetAdd);
|
||||||
REDISMODULE_GET_API(ZsetIncrby);
|
REDISMODULE_GET_API(ZsetIncrby);
|
||||||
REDISMODULE_GET_API(ZsetScore);
|
REDISMODULE_GET_API(ZsetScore);
|
||||||
@ -775,6 +782,7 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int
|
|||||||
REDISMODULE_GET_API(InfoAddFieldLongLong);
|
REDISMODULE_GET_API(InfoAddFieldLongLong);
|
||||||
REDISMODULE_GET_API(InfoAddFieldULongLong);
|
REDISMODULE_GET_API(InfoAddFieldULongLong);
|
||||||
REDISMODULE_GET_API(GetClientInfoById);
|
REDISMODULE_GET_API(GetClientInfoById);
|
||||||
|
REDISMODULE_GET_API(PublishMessage);
|
||||||
REDISMODULE_GET_API(SubscribeToServerEvent);
|
REDISMODULE_GET_API(SubscribeToServerEvent);
|
||||||
REDISMODULE_GET_API(BlockClientOnKeys);
|
REDISMODULE_GET_API(BlockClientOnKeys);
|
||||||
REDISMODULE_GET_API(SignalKeyAsReady);
|
REDISMODULE_GET_API(SignalKeyAsReady);
|
||||||
|
@ -1864,6 +1864,7 @@ void aofRewriteBufferReset(void);
|
|||||||
unsigned long aofRewriteBufferSize(void);
|
unsigned long aofRewriteBufferSize(void);
|
||||||
ssize_t aofReadDiffFromParent(void);
|
ssize_t aofReadDiffFromParent(void);
|
||||||
void killAppendOnlyChild(void);
|
void killAppendOnlyChild(void);
|
||||||
|
void restartAOFAfterSYNC();
|
||||||
|
|
||||||
/* Child info */
|
/* Child info */
|
||||||
void openChildInfoPipe(void);
|
void openChildInfoPipe(void);
|
||||||
@ -2103,6 +2104,7 @@ robj *dbUnshareStringValue(redisDb *db, robj *key, robj *o);
|
|||||||
#define EMPTYDB_ASYNC (1<<0) /* Reclaim memory in another thread. */
|
#define EMPTYDB_ASYNC (1<<0) /* Reclaim memory in another thread. */
|
||||||
long long emptyDb(int dbnum, int flags, void(callback)(void*));
|
long long emptyDb(int dbnum, int flags, void(callback)(void*));
|
||||||
long long emptyDbGeneric(redisDb *dbarray, int dbnum, int flags, void(callback)(void*));
|
long long emptyDbGeneric(redisDb *dbarray, int dbnum, int flags, void(callback)(void*));
|
||||||
|
void flushAllDataAndResetRDB(int flags);
|
||||||
long long dbTotalServerKeyCount();
|
long long dbTotalServerKeyCount();
|
||||||
|
|
||||||
int selectDb(client *c, int id);
|
int selectDb(client *c, int id);
|
||||||
|
Loading…
x
Reference in New Issue
Block a user