From 412eb67d21705ca31162d614fb8e3a6a568ef5ed Mon Sep 17 00:00:00 2001 From: Yuan Zhou Date: Wed, 29 May 2019 14:21:47 +0800 Subject: [PATCH 01/34] aof: fix assignment for aof_fsync_offset Signed-off-by: Yuan Zhou --- src/aof.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/aof.c b/src/aof.c index 4744847d2..c8fb8e8f6 100644 --- a/src/aof.c +++ b/src/aof.c @@ -1768,7 +1768,7 @@ void backgroundRewriteDoneHandler(int exitcode, int bysignal) { server.aof_selected_db = -1; /* Make sure SELECT is re-issued */ aofUpdateCurrentSize(); server.aof_rewrite_base_size = server.aof_current_size; - server.aof_current_size = server.aof_current_size; + server.aof_fsync_offset = server.aof_current_size; /* Clear regular AOF buffer since its contents was just written to * the new AOF from the background rewrite buffer. */ From a0cfd519e3cbbb4c432d25094e4cb202631f131f Mon Sep 17 00:00:00 2001 From: Oran Agra Date: Tue, 29 Oct 2019 17:29:06 +0200 Subject: [PATCH 02/34] test infra: improve prints on failed assertions sometimes we have several assertions with the same condition in the same test at different stages, and when these fail (the ones that print the condition text) you don't know which one it was. other assertions didn't print the condition text (variable names), just the expected and unexpected values. So now, all assertions print context line, and conditin text. besides, one of the major differences between 'assert' and 'assert_equal', is that the later is able to print the value that doesn't match the expected. if there is a rare non-reproducible failure, it is helpful to know what was the value the test encountered and how far it was from the threshold. So now, adding assert_lessthan and assert_range that can be used in some places. were we used just 'assert { a > b }' so far. --- tests/support/test.tcl | 39 +++++++++++++++++++++++++++++++++------ 1 file changed, 33 insertions(+), 6 deletions(-) diff --git a/tests/support/test.tcl b/tests/support/test.tcl index 2646acecd..5e8916236 100644 --- a/tests/support/test.tcl +++ b/tests/support/test.tcl @@ -11,28 +11,55 @@ proc fail {msg} { proc assert {condition} { if {![uplevel 1 [list expr $condition]]} { - error "assertion:Expected condition '$condition' to be true ([uplevel 1 [list subst -nocommands $condition]])" + set context "(context: [info frame -1])" + error "assertion:Expected [uplevel 1 [list subst -nocommands $condition]] $context" } } proc assert_no_match {pattern value} { if {[string match $pattern $value]} { - error "assertion:Expected '$value' to not match '$pattern'" + set context "(context: [info frame -1])" + error "assertion:Expected '$value' to not match '$pattern' $context" } } proc assert_match {pattern value} { if {![string match $pattern $value]} { - error "assertion:Expected '$value' to match '$pattern'" + set context "(context: [info frame -1])" + error "assertion:Expected '$value' to match '$pattern' $context" } } -proc assert_equal {expected value {detail ""}} { +proc assert_equal {value expected {detail ""}} { if {$expected ne $value} { if {$detail ne ""} { - set detail " (detail: $detail)" + set detail "(detail: $detail)" + } else { + set detail "(context: [info frame -1])" } - error "assertion:Expected '$value' to be equal to '$expected'$detail" + error "assertion:Expected '$value' to be equal to '$expected' $detail" + } +} + +proc assert_lessthan {value expected {detail ""}} { + if {!($value < $expected)} { + if {$detail ne ""} { + set detail "(detail: $detail)" + } else { + set detail "(context: [info frame -1])" + } + error "assertion:Expected '$value' to be lessthan to '$expected' $detail" + } +} + +proc assert_range {value min max {detail ""}} { + if {!($value <= $max && $value >= $min)} { + if {$detail ne ""} { + set detail "(detail: $detail)" + } else { + set detail "(context: [info frame -1])" + } + error "assertion:Expected '$value' to be between to '$min' and '$max' $detail" } } From 3adf10b8095475e2170831e036f67e6b2c9c6bdb Mon Sep 17 00:00:00 2001 From: Oran Agra Date: Mon, 4 Nov 2019 19:30:31 +0200 Subject: [PATCH 03/34] Test coverage for new module APIs: dbsize, flushall, randomkey, lru get/set --- tests/modules/misc.c | 69 +++++++++++++++++++++++++++++++++++ tests/unit/moduleapi/misc.tcl | 19 ++++++++++ 2 files changed, 88 insertions(+) diff --git a/tests/modules/misc.c b/tests/modules/misc.c index fd892f52c..7701a9c7c 100644 --- a/tests/modules/misc.c +++ b/tests/modules/misc.c @@ -40,6 +40,65 @@ int test_call_info(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) return REDISMODULE_OK; } +int test_flushall(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) +{ + REDISMODULE_NOT_USED(argv); + REDISMODULE_NOT_USED(argc); + RedisModule_ResetDataset(1, 0); + RedisModule_ReplyWithCString(ctx, "Ok"); + return REDISMODULE_OK; +} + +int test_dbsize(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) +{ + REDISMODULE_NOT_USED(argv); + REDISMODULE_NOT_USED(argc); + long long ll = RedisModule_DbSize(ctx); + RedisModule_ReplyWithLongLong(ctx, ll); + return REDISMODULE_OK; +} + +int test_randomkey(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) +{ + REDISMODULE_NOT_USED(argv); + REDISMODULE_NOT_USED(argc); + RedisModuleString *str = RedisModule_RandomKey(ctx); + RedisModule_ReplyWithString(ctx, str); + RedisModule_FreeString(ctx, str); + return REDISMODULE_OK; +} + +int test_getlru(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) +{ + if (argc<2) { + RedisModule_WrongArity(ctx); + return REDISMODULE_OK; + } + RedisModuleString *keyname = argv[1]; + RedisModuleKey *key = RedisModule_OpenKey(ctx, keyname, REDISMODULE_READ|REDISMODULE_OPEN_KEY_NOTOUCH); + long long lru, lfu; + RedisModule_GetLRUOrLFU(key, &lfu, &lru); + RedisModule_ReplyWithLongLong(ctx, lru); + RedisModule_CloseKey(key); + return REDISMODULE_OK; +} + +int test_setlru(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) +{ + if (argc<3) { + RedisModule_WrongArity(ctx); + return REDISMODULE_OK; + } + RedisModuleString *keyname = argv[1]; + RedisModuleKey *key = RedisModule_OpenKey(ctx, keyname, REDISMODULE_WRITE|REDISMODULE_OPEN_KEY_NOTOUCH); + long long lru; + RedisModule_StringToLongLong(argv[2], &lru); + RedisModule_SetLRUOrLFU(key, -1, lru); + RedisModule_ReplyWithCString(ctx, "Ok"); + RedisModule_CloseKey(key); + return REDISMODULE_OK; +} + int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { REDISMODULE_NOT_USED(argv); REDISMODULE_NOT_USED(argc); @@ -50,6 +109,16 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) return REDISMODULE_ERR; if (RedisModule_CreateCommand(ctx,"test.call_info", test_call_info,"",0,0,0) == REDISMODULE_ERR) return REDISMODULE_ERR; + if (RedisModule_CreateCommand(ctx,"test.flushall", test_flushall,"",0,0,0) == REDISMODULE_ERR) + return REDISMODULE_ERR; + if (RedisModule_CreateCommand(ctx,"test.dbsize", test_dbsize,"",0,0,0) == REDISMODULE_ERR) + return REDISMODULE_ERR; + if (RedisModule_CreateCommand(ctx,"test.randomkey", test_randomkey,"",0,0,0) == REDISMODULE_ERR) + return REDISMODULE_ERR; + if (RedisModule_CreateCommand(ctx,"test.setlru", test_setlru,"",0,0,0) == REDISMODULE_ERR) + return REDISMODULE_ERR; + if (RedisModule_CreateCommand(ctx,"test.getlru", test_getlru,"",0,0,0) == REDISMODULE_ERR) + return REDISMODULE_ERR; return REDISMODULE_OK; } diff --git a/tests/unit/moduleapi/misc.tcl b/tests/unit/moduleapi/misc.tcl index d392aeab0..ebfa9631f 100644 --- a/tests/unit/moduleapi/misc.tcl +++ b/tests/unit/moduleapi/misc.tcl @@ -16,4 +16,23 @@ start_server {tags {"modules"}} { assert { [string match "*cmdstat_module*" $info] } } + test {test module db commands} { + r set x foo + set key [r test.randomkey] + assert_equal $key "x" + assert_equal [r test.dbsize] 1 + r test.flushall + assert_equal [r test.dbsize] 0 + } + + test {test modle lru api} { + r set x foo + set lru [r test.getlru x] + assert { $lru <= 1 } + r test.setlru x 100 + set idle [r object idletime x] + assert { $idle >= 100 } + set lru [r test.getlru x] + assert { $lru >= 100 } + } } From 5350e7669e9dfe7b2b4bcf663171920c441c19e1 Mon Sep 17 00:00:00 2001 From: Yossi Gottlieb Date: Mon, 11 Jul 2016 16:47:37 +0300 Subject: [PATCH 04/34] Add ModuleDataType to/from string serialization. Add two new functions that leverage the RedisModuleDataType mechanism for RDB serialization/deserialization and make it possible to use it to/from arbitrary strings: * RM_SaveDataTypeToString() * RM_LoadDataTypeFromString() --- runtest-moduleapi | 1 + src/module.c | 55 ++++++++++ src/redismodule.h | 4 + tests/modules/Makefile | 3 +- tests/modules/datatype.c | 161 ++++++++++++++++++++++++++++++ tests/unit/moduleapi/datatype.tcl | 27 +++++ 6 files changed, 250 insertions(+), 1 deletion(-) create mode 100644 tests/modules/datatype.c create mode 100644 tests/unit/moduleapi/datatype.tcl diff --git a/runtest-moduleapi b/runtest-moduleapi index e48535126..e5a5a1200 100755 --- a/runtest-moduleapi +++ b/runtest-moduleapi @@ -22,4 +22,5 @@ $TCLSH tests/test_helper.tcl \ --single unit/moduleapi/hooks \ --single unit/moduleapi/misc \ --single unit/moduleapi/blockonkeys \ +--single unit/moduleapi/datatype "${@}" diff --git a/src/module.c b/src/module.c index ad34e7b64..ca9fb5804 100644 --- a/src/module.c +++ b/src/module.c @@ -3884,6 +3884,59 @@ void RM_DigestEndSequence(RedisModuleDigest *md) { memset(md->o,0,sizeof(md->o)); } +/* Decode a serialized representation of a module data type 'mt' from string + * 'str' and return a newly allocated value, or NULL if decoding failed. + * + * This call basically reuses the 'rdb_load' callback which module data types + * implement in order to allow a module to arbitrarily serialize/de-serialize + * keys, similar to how the Redis 'DUMP' and 'RESTORE' commands are implemented. + * + * Modules should generally use the REDISMODULE_OPTIONS_HANDLE_IO_ERRORS flag and + * make sure the de-serialization code properly checks and handles IO errors + * (freeing allocated buffers and returning a NULL). + * + * If this is NOT done, Redis will handle corrupted (or just truncated) serialized + * data by producing an error message and terminating the process. + */ + +void *RM_LoadDataTypeFromString(const RedisModuleString *str, const moduleType *mt) { + rio payload; + RedisModuleIO io; + + rioInitWithBuffer(&payload, str->ptr); + moduleInitIOContext(io,(moduleType *)mt,&payload,NULL); + + /* All RM_Save*() calls always write a version 2 compatible format, so we + * need to make sure we read the same. + */ + io.ver = 2; + return mt->rdb_load(&io,0); +} + +/* Encode a module data type 'mt' value 'data' into serialized form, and return it + * as a newly allocated RedisModuleString. + * + * This call basically reuses the 'rdb_save' callback which module data types + * implement in order to allow a module to arbitrarily serialize/de-serialize + * keys, similar to how the Redis 'DUMP' and 'RESTORE' commands are implemented. + */ + +RedisModuleString *RM_SaveDataTypeToString(RedisModuleCtx *ctx, void *data, const moduleType *mt) { + rio payload; + RedisModuleIO io; + + rioInitWithBuffer(&payload,sdsempty()); + moduleInitIOContext(io,(moduleType *)mt,&payload,NULL); + mt->rdb_save(&io,data); + if (io.error) { + return NULL; + } else { + robj *str = createObject(OBJ_STRING,payload.io.buffer.ptr); + autoMemoryAdd(ctx,REDISMODULE_AM_STRING,str); + return str; + } +} + /* -------------------------------------------------------------------------- * AOF API for modules data types * -------------------------------------------------------------------------- */ @@ -6876,6 +6929,8 @@ void moduleRegisterCoreAPI(void) { REGISTER_API(LoadFloat); REGISTER_API(SaveLongDouble); REGISTER_API(LoadLongDouble); + REGISTER_API(SaveDataTypeToString); + REGISTER_API(LoadDataTypeFromString); REGISTER_API(EmitAOF); REGISTER_API(Log); REGISTER_API(LogIOError); diff --git a/src/redismodule.h b/src/redismodule.h index 728c7f584..21f3988cf 100644 --- a/src/redismodule.h +++ b/src/redismodule.h @@ -537,6 +537,8 @@ void REDISMODULE_API_FUNC(RedisModule_SaveFloat)(RedisModuleIO *io, float value) float REDISMODULE_API_FUNC(RedisModule_LoadFloat)(RedisModuleIO *io); void REDISMODULE_API_FUNC(RedisModule_SaveLongDouble)(RedisModuleIO *io, long double value); long double REDISMODULE_API_FUNC(RedisModule_LoadLongDouble)(RedisModuleIO *io); +void *REDISMODULE_API_FUNC(RedisModule_LoadDataTypeFromString)(const RedisModuleString *str, const RedisModuleType *mt); +RedisModuleString *REDISMODULE_API_FUNC(RedisModule_SaveDataTypeToString)(RedisModuleCtx *ctx, void *data, const RedisModuleType *mt); void REDISMODULE_API_FUNC(RedisModule_Log)(RedisModuleCtx *ctx, const char *level, const char *fmt, ...); void REDISMODULE_API_FUNC(RedisModule_LogIOError)(RedisModuleIO *io, const char *levelstr, const char *fmt, ...); void REDISMODULE_API_FUNC(RedisModule__Assert)(const char *estr, const char *file, int line); @@ -745,6 +747,8 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int REDISMODULE_GET_API(LoadFloat); REDISMODULE_GET_API(SaveLongDouble); REDISMODULE_GET_API(LoadLongDouble); + REDISMODULE_GET_API(SaveDataTypeToString); + REDISMODULE_GET_API(LoadDataTypeFromString); REDISMODULE_GET_API(EmitAOF); REDISMODULE_GET_API(Log); REDISMODULE_GET_API(LogIOError); diff --git a/tests/modules/Makefile b/tests/modules/Makefile index 9e27758a2..26627c30c 100644 --- a/tests/modules/Makefile +++ b/tests/modules/Makefile @@ -19,7 +19,8 @@ TEST_MODULES = \ propagate.so \ misc.so \ hooks.so \ - blockonkeys.so + blockonkeys.so \ + datatype.so .PHONY: all diff --git a/tests/modules/datatype.c b/tests/modules/datatype.c new file mode 100644 index 000000000..7c39ab457 --- /dev/null +++ b/tests/modules/datatype.c @@ -0,0 +1,161 @@ +/* This module current tests a small subset but should be extended in the future + * for general ModuleDataType coverage. + */ + +#include "redismodule.h" + +static RedisModuleType *datatype = NULL; + +typedef struct { + long long intval; + RedisModuleString *strval; +} DataType; + +static void *datatype_load(RedisModuleIO *io, int encver) { + (void) encver; + + int intval = RedisModule_LoadSigned(io); + if (RedisModule_IsIOError(io)) return NULL; + + RedisModuleString *strval = RedisModule_LoadString(io); + if (RedisModule_IsIOError(io)) return NULL; + + DataType *dt = (DataType *) RedisModule_Alloc(sizeof(DataType)); + dt->intval = intval; + dt->strval = strval; + return dt; +} + +static void datatype_save(RedisModuleIO *io, void *value) { + DataType *dt = (DataType *) value; + RedisModule_SaveSigned(io, dt->intval); + RedisModule_SaveString(io, dt->strval); +} + +static void datatype_free(void *value) { + if (value) { + DataType *dt = (DataType *) value; + + if (dt->strval) RedisModule_FreeString(NULL, dt->strval); + RedisModule_Free(dt); + } +} + +static int datatype_set(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + if (argc != 4) { + RedisModule_WrongArity(ctx); + return REDISMODULE_OK; + } + + long long intval; + + if (RedisModule_StringToLongLong(argv[2], &intval) != REDISMODULE_OK) { + RedisModule_ReplyWithError(ctx, "Invalid integr value"); + return REDISMODULE_OK; + } + + RedisModuleKey *key = RedisModule_OpenKey(ctx, argv[1], REDISMODULE_WRITE); + DataType *dt = RedisModule_Calloc(sizeof(DataType), 1); + dt->intval = intval; + dt->strval = argv[3]; + RedisModule_RetainString(ctx, dt->strval); + + RedisModule_ModuleTypeSetValue(key, datatype, dt); + RedisModule_CloseKey(key); + RedisModule_ReplyWithSimpleString(ctx, "OK"); + + return REDISMODULE_OK; +} + +static int datatype_restore(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + if (argc != 3) { + RedisModule_WrongArity(ctx); + return REDISMODULE_OK; + } + + DataType *dt = RedisModule_LoadDataTypeFromString(argv[2], datatype); + if (!dt) { + RedisModule_ReplyWithError(ctx, "Invalid data"); + return REDISMODULE_OK; + } + + RedisModuleKey *key = RedisModule_OpenKey(ctx, argv[1], REDISMODULE_WRITE); + RedisModule_ModuleTypeSetValue(key, datatype, dt); + RedisModule_CloseKey(key); + RedisModule_ReplyWithSimpleString(ctx, "OK"); + + return REDISMODULE_OK; +} + +static int datatype_get(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + if (argc != 2) { + RedisModule_WrongArity(ctx); + return REDISMODULE_OK; + } + + RedisModuleKey *key = RedisModule_OpenKey(ctx, argv[1], REDISMODULE_READ); + DataType *dt = RedisModule_ModuleTypeGetValue(key); + RedisModule_CloseKey(key); + + RedisModule_ReplyWithArray(ctx, 2); + RedisModule_ReplyWithLongLong(ctx, dt->intval); + RedisModule_ReplyWithString(ctx, dt->strval); + return REDISMODULE_OK; +} + +static int datatype_dump(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + if (argc != 2) { + RedisModule_WrongArity(ctx); + return REDISMODULE_OK; + } + + RedisModuleKey *key = RedisModule_OpenKey(ctx, argv[1], REDISMODULE_READ); + DataType *dt = RedisModule_ModuleTypeGetValue(key); + RedisModule_CloseKey(key); + + RedisModuleString *reply = RedisModule_SaveDataTypeToString(ctx, dt, datatype); + if (!reply) { + RedisModule_ReplyWithError(ctx, "Failed to save"); + return REDISMODULE_OK; + } + + RedisModule_ReplyWithString(ctx, reply); + RedisModule_FreeString(ctx, reply); + return REDISMODULE_OK; +} + + +int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + REDISMODULE_NOT_USED(argv); + REDISMODULE_NOT_USED(argc); + + if (RedisModule_Init(ctx,"datatype",1,REDISMODULE_APIVER_1) == REDISMODULE_ERR) + return REDISMODULE_ERR; + + RedisModule_SetModuleOptions(ctx, REDISMODULE_OPTIONS_HANDLE_IO_ERRORS); + + RedisModuleTypeMethods datatype_methods = { + .version = REDISMODULE_TYPE_METHOD_VERSION, + .rdb_load = datatype_load, + .rdb_save = datatype_save, + .free = datatype_free, + }; + + datatype = RedisModule_CreateDataType(ctx, "test___dt", 1, &datatype_methods); + if (datatype == NULL) + return REDISMODULE_ERR; + + if (RedisModule_CreateCommand(ctx,"datatype.set", datatype_set,"deny-oom",1,1,1) == REDISMODULE_ERR) + return REDISMODULE_ERR; + + if (RedisModule_CreateCommand(ctx,"datatype.get", datatype_get,"",1,1,1) == REDISMODULE_ERR) + return REDISMODULE_ERR; + + if (RedisModule_CreateCommand(ctx,"datatype.restore", datatype_restore,"deny-oom",1,1,1) == REDISMODULE_ERR) + return REDISMODULE_ERR; + + if (RedisModule_CreateCommand(ctx,"datatype.dump", datatype_dump,"",1,1,1) == REDISMODULE_ERR) + return REDISMODULE_ERR; + + return REDISMODULE_OK; +} diff --git a/tests/unit/moduleapi/datatype.tcl b/tests/unit/moduleapi/datatype.tcl new file mode 100644 index 000000000..c1da696d3 --- /dev/null +++ b/tests/unit/moduleapi/datatype.tcl @@ -0,0 +1,27 @@ +set testmodule [file normalize tests/modules/datatype.so] + +start_server {tags {"modules"}} { + r module load $testmodule + + test {DataType: Test module is sane, GET/SET work.} { + r datatype.set dtkey 100 stringval + assert {[r datatype.get dtkey] eq {100 stringval}} + } + + test {DataType: RM_SaveDataTypeToString(), RM_LoadDataTypeFromString() work} { + r datatype.set dtkey -1111 MyString + set encoded [r datatype.dump dtkey] + + r datatype.restore dtkeycopy $encoded + assert {[r datatype.get dtkeycopy] eq {-1111 MyString}} + } + + test {DataType: Handle truncated RM_LoadDataTypeFromString()} { + r datatype.set dtkey -1111 MyString + set encoded [r datatype.dump dtkey] + set truncated [string range $encoded 0 end-1] + + catch {r datatype.restore dtkeycopy $truncated} e + set e + } {*Invalid*} +} From e542132b07a76c73cd5e1dd067671afbb4c53fe6 Mon Sep 17 00:00:00 2001 From: "zhaozhao.zz" Date: Mon, 4 Nov 2019 20:32:19 +0800 Subject: [PATCH 05/34] expires: refactoring judgment about whether a key is expired Calling lookupKey*() many times to search a key in one command may get different result. That's because lookupKey*() calls expireIfNeeded(), and delete the key when reach the expire time. So we can get an robj before the expire time, but a NULL after the expire time. The worst is that may lead to Redis crash, for example `RPOPLPUSH foo foo` the first time we get a list form `foo` and hold the pointer, but when we get `foo` again it's expired and deleted. Now we hold a freed memory, when execute rpoplpushHandlePush() redis crash. To fix it, we can refactor the judgment about whether a key is expired, using the same basetime `server.cmd_start_mstime` instead of calling mstime() everytime. --- src/db.c | 2 +- src/server.c | 1 + src/server.h | 1 + 3 files changed, 3 insertions(+), 1 deletion(-) diff --git a/src/db.c b/src/db.c index 2c0a0cdd3..1a272faae 100644 --- a/src/db.c +++ b/src/db.c @@ -1199,7 +1199,7 @@ int keyIsExpired(redisDb *db, robj *key) { * only the first time it is accessed and not in the middle of the * script execution, making propagation to slaves / AOF consistent. * See issue #1525 on Github for more information. */ - mstime_t now = server.lua_caller ? server.lua_time_start : mstime(); + mstime_t now = server.lua_caller ? server.lua_time_start : server.cmd_start_mstime; return now > when; } diff --git a/src/server.c b/src/server.c index 8f165113d..99438ccac 100644 --- a/src/server.c +++ b/src/server.c @@ -3596,6 +3596,7 @@ int processCommand(client *c) { queueMultiCommand(c); addReply(c,shared.queued); } else { + server.cmd_start_mstime = mstime(); call(c,CMD_CALL_FULL); c->woff = server.master_repl_offset; if (listLength(server.ready_keys)) diff --git a/src/server.h b/src/server.h index f724f7d64..08eb2eef9 100644 --- a/src/server.h +++ b/src/server.h @@ -1401,6 +1401,7 @@ struct redisServer { time_t timezone; /* Cached timezone. As set by tzset(). */ int daylight_active; /* Currently in daylight saving time. */ long long mstime; /* 'unixtime' with milliseconds resolution. */ + mstime_t cmd_start_mstime; /* Pubsub */ dict *pubsub_channels; /* Map channels to list of subscribed clients */ list *pubsub_patterns; /* A list of pubsub_patterns */ From 824f5f0b7a1f32a6e6abf3d355d523d2c6e7bf09 Mon Sep 17 00:00:00 2001 From: antirez Date: Tue, 5 Nov 2019 10:14:34 +0100 Subject: [PATCH 06/34] Update PR #6537 patch to for generality. After the thread in #6537 and thanks to the suggestions received, this commit updates the original patch in order to: 1. Solve the problem of updating the time in multiple places by updating it in call(). 2. Avoid introducing a new field but use our cached time. This required some minor refactoring to the function updating the time, and the introduction of a new cached time in microseconds in order to use less gettimeofday() calls. --- src/db.c | 8 ++++++-- src/rdb.c | 2 +- src/server.c | 36 +++++++++++++++++++++++------------- src/server.h | 7 ++++--- 4 files changed, 34 insertions(+), 19 deletions(-) diff --git a/src/db.c b/src/db.c index 1a272faae..6c0b190a8 100644 --- a/src/db.c +++ b/src/db.c @@ -1198,8 +1198,12 @@ int keyIsExpired(redisDb *db, robj *key) { * blocked to when the Lua script started. This way a key can expire * only the first time it is accessed and not in the middle of the * script execution, making propagation to slaves / AOF consistent. - * See issue #1525 on Github for more information. */ - mstime_t now = server.lua_caller ? server.lua_time_start : server.cmd_start_mstime; + * See issue #1525 on Github for more information. + * + * Outside the Lua script execution, we use the cached time server.mstime + * that is updated before commands executions in call(). */ + mstime_t now = server.lua_caller ? server.lua_time_start : + server.mstime; return now > when; } diff --git a/src/rdb.c b/src/rdb.c index f530219a4..a499362d5 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -1961,7 +1961,7 @@ void rdbLoadProgressCallback(rio *r, const void *buf, size_t len) { /* The DB can take some non trivial amount of time to load. Update * our cached time since it is used to create and update the last * interaction time with clients and for other important things. */ - updateCachedTime(); + updateCachedTime(0); if (server.masterhost && server.repl_state == REPL_STATE_TRANSFER) replicationSendNewlineToMaster(); loadingProgress(r->processed_bytes); diff --git a/src/server.c b/src/server.c index 99438ccac..2e5d8193b 100644 --- a/src/server.c +++ b/src/server.c @@ -1736,20 +1736,29 @@ void databasesCron(void) { /* We take a cached value of the unix time in the global state because with * virtual memory and aging there is to store the current time in objects at * every object access, and accuracy is not needed. To access a global var is - * a lot faster than calling time(NULL) */ -void updateCachedTime(void) { - server.unixtime = time(NULL); - server.mstime = mstime(); + * a lot faster than calling time(NULL). + * + * This function should be fast because it is called at every command execution + * in call(), so it is possible to decide if to update the daylight saving + * info or not using the 'update_daylight_info' argument. Normally we update + * such info only when calling this function from serverCron() but not when + * calling it from call(). */ +void updateCachedTime(int update_daylight_info) { + server.ustime = ustime(); + server.mstime = server.ustime / 1000; + server.unixtime = server.mstime / 1000; /* To get information about daylight saving time, we need to call * localtime_r and cache the result. However calling localtime_r in this * context is safe since we will never fork() while here, in the main * thread. The logging function will call a thread safe version of * localtime that has no locks. */ - struct tm tm; - time_t ut = server.unixtime; - localtime_r(&ut,&tm); - server.daylight_active = tm.tm_isdst; + if (update_daylight_info) { + struct tm tm; + time_t ut = server.unixtime; + localtime_r(&ut,&tm); + server.daylight_active = tm.tm_isdst; + } } void checkChildrenDone(void) { @@ -1838,7 +1847,7 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { if (server.watchdog_period) watchdogScheduleSignal(server.watchdog_period); /* Update the time cache. */ - updateCachedTime(); + updateCachedTime(1); server.hz = server.config_hz; /* Adapt the server.hz value to the number of configured clients. If we have @@ -2252,7 +2261,7 @@ void createSharedObjects(void) { void initServerConfig(void) { int j; - updateCachedTime(); + updateCachedTime(1); getRandomHexChars(server.runid,CONFIG_RUN_ID_SIZE); server.runid[CONFIG_RUN_ID_SIZE] = '\0'; changeReplicationId(); @@ -3238,7 +3247,8 @@ void preventCommandReplication(client *c) { * */ void call(client *c, int flags) { - long long dirty, start, duration; + long long dirty; + ustime_t start, duration; int client_old_flags = c->flags; struct redisCommand *real_cmd = c->cmd; @@ -3259,7 +3269,8 @@ void call(client *c, int flags) { /* Call the command. */ dirty = server.dirty; - start = ustime(); + updateCachedTime(0); + start = server.ustime; c->cmd->proc(c); duration = ustime()-start; dirty = server.dirty-dirty; @@ -3596,7 +3607,6 @@ int processCommand(client *c) { queueMultiCommand(c); addReply(c,shared.queued); } else { - server.cmd_start_mstime = mstime(); call(c,CMD_CALL_FULL); c->woff = server.master_repl_offset; if (listLength(server.ready_keys)) diff --git a/src/server.h b/src/server.h index 08eb2eef9..24dbc079c 100644 --- a/src/server.h +++ b/src/server.h @@ -50,6 +50,7 @@ #include typedef long long mstime_t; /* millisecond time type. */ +typedef long long ustime_t; /* microsecond time type. */ #include "ae.h" /* Event driven programming library */ #include "sds.h" /* Dynamic safe strings */ @@ -1400,8 +1401,8 @@ struct redisServer { _Atomic time_t unixtime; /* Unix time sampled every cron cycle. */ time_t timezone; /* Cached timezone. As set by tzset(). */ int daylight_active; /* Currently in daylight saving time. */ - long long mstime; /* 'unixtime' with milliseconds resolution. */ - mstime_t cmd_start_mstime; + mstime_t mstime; /* 'unixtime' in milliseconds. */ + ustime_t ustime; /* 'unixtime' in microseconds. */ /* Pubsub */ dict *pubsub_channels; /* Map channels to list of subscribed clients */ list *pubsub_patterns; /* A list of pubsub_patterns */ @@ -1997,7 +1998,7 @@ void populateCommandTable(void); void resetCommandTableStats(void); void adjustOpenFilesLimit(void); void closeListeningSockets(int unlink_unix_socket); -void updateCachedTime(void); +void updateCachedTime(int update_daylight_info); void resetServerStats(void); void activeDefragCycle(void); unsigned int getLRUClock(void); From 8b2c0f90442c0646d7265ef150dd5afa3172b86e Mon Sep 17 00:00:00 2001 From: antirez Date: Wed, 6 Nov 2019 09:57:29 +0100 Subject: [PATCH 07/34] Update PR #6537: use a fresh time outside call(). One problem with the solution proposed so far in #6537 is that key lookups outside a command execution via call(), still used a cached time. The cached time needed to be refreshed in multiple places, especially because of modules callbacks from timers, cluster bus, and thread safe contexts, that may use RM_Open(). In order to avoid this problem, this commit introduces the ability to detect if we are inside call(): this way we can use the reference fixed time only when we are in the context of a command execution or Lua script, but for the asynchronous lookups, we can still use mstime() to get a fresh time reference. --- src/db.c | 27 +++++++++++++++++++++------ src/server.c | 4 ++++ src/server.h | 3 ++- 3 files changed, 27 insertions(+), 7 deletions(-) diff --git a/src/db.c b/src/db.c index 6c0b190a8..5ea3087fc 100644 --- a/src/db.c +++ b/src/db.c @@ -1188,6 +1188,7 @@ void propagateExpire(redisDb *db, robj *key, int lazy) { /* Check if the key is expired. */ int keyIsExpired(redisDb *db, robj *key) { mstime_t when = getExpire(db,key); + mstime_t now; if (when < 0) return 0; /* No expire for this key */ @@ -1198,13 +1199,27 @@ int keyIsExpired(redisDb *db, robj *key) { * blocked to when the Lua script started. This way a key can expire * only the first time it is accessed and not in the middle of the * script execution, making propagation to slaves / AOF consistent. - * See issue #1525 on Github for more information. - * - * Outside the Lua script execution, we use the cached time server.mstime - * that is updated before commands executions in call(). */ - mstime_t now = server.lua_caller ? server.lua_time_start : - server.mstime; + * See issue #1525 on Github for more information. */ + if (server.lua_caller) { + now = server.lua_time_start; + } + /* If we are in the middle of a command execution, we still want to use + * a reference time that does not change: in that case we just use the + * cached time, that we update before each call in the call() function. + * This way we avoid that commands such as RPOPLPUSH or similar, that + * may re-open the same key multiple times, can invalidate an already + * open object in a next call, if the next call will see the key expired, + * while the first did not. */ + else if (server.call_depth > 0) { + now = server.mstime; + } + /* For the other cases, we want to use the most fresh time we have. */ + else { + now = mstime(); + } + /* The key expired if the current (virtual or real) time is greater + * than the expire time of the key. */ return now > when; } diff --git a/src/server.c b/src/server.c index 2e5d8193b..acf8eebbc 100644 --- a/src/server.c +++ b/src/server.c @@ -2780,6 +2780,7 @@ void initServer(void) { server.hz = server.config_hz; server.pid = getpid(); server.current_client = NULL; + server.call_depth = 0; server.clients = listCreate(); server.clients_index = raxNew(); server.clients_to_close = listCreate(); @@ -3252,6 +3253,8 @@ void call(client *c, int flags) { int client_old_flags = c->flags; struct redisCommand *real_cmd = c->cmd; + server.call_depth++; + /* Sent the command to clients in MONITOR mode, only if the commands are * not generated from reading an AOF. */ if (listLength(server.monitors) && @@ -3377,6 +3380,7 @@ void call(client *c, int flags) { trackingRememberKeys(caller); } + server.call_depth--; server.stat_numcommands++; } diff --git a/src/server.h b/src/server.h index 24dbc079c..4a497c47c 100644 --- a/src/server.h +++ b/src/server.h @@ -1134,7 +1134,8 @@ struct redisServer { list *clients_pending_write; /* There is to write or install handler. */ list *clients_pending_read; /* Client has pending read socket buffers. */ list *slaves, *monitors; /* List of slaves and MONITORs */ - client *current_client; /* Current client, only used on crash report */ + client *current_client; /* Current client executing the command. */ + long call_depth; /* call() re-entering count. */ rax *clients_index; /* Active clients dictionary by client ID. */ int clients_paused; /* True if clients are currently paused */ mstime_t clients_pause_end_time; /* Time when we undo clients_paused */ From 9593ffde2ed92af512c5d33a2e8e6b9fed516d8c Mon Sep 17 00:00:00 2001 From: Patrick Valsecchi Date: Thu, 7 Nov 2019 08:49:19 +0100 Subject: [PATCH 08/34] Redis sentinel kill pubsub client connections as well When a redis instance becomes a slave, sentinel also kills pubsub clients. Closes #6545 --- src/sentinel.c | 20 +++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/src/sentinel.c b/src/sentinel.c index 0490db4e9..d5f22b97f 100644 --- a/src/sentinel.c +++ b/src/sentinel.c @@ -465,6 +465,12 @@ struct redisCommand sentinelcmds[] = { {"hello",helloCommand,-2,"no-script fast",0,NULL,0,0,0,0,0} }; +/* List of client types that are killed when an instance becomes a slave */ +const char* killedClientTypes[] = { + "normal", + "pubsub" +}; + /* This function overwrites a few normal Redis config default with Sentinel * specific defaults. */ void initSentinelConfig(void) { @@ -3949,6 +3955,7 @@ char *sentinelGetLeader(sentinelRedisInstance *master, uint64_t epoch) { int sentinelSendSlaveOf(sentinelRedisInstance *ri, char *host, int port) { char portstr[32]; int retval; + unsigned int curType; ll2string(portstr,sizeof(portstr),port); @@ -3993,11 +4000,14 @@ int sentinelSendSlaveOf(sentinelRedisInstance *ri, char *host, int port) { * an issue because CLIENT is variadic command, so Redis will not * recognized as a syntax error, and the transaction will not fail (but * only the unsupported command will fail). */ - retval = redisAsyncCommand(ri->link->cc, - sentinelDiscardReplyCallback, ri, "%s KILL TYPE normal", - sentinelInstanceMapCommand(ri,"CLIENT")); - if (retval == C_ERR) return retval; - ri->link->pending_commands++; + for (curType = 0; curType < sizeof(killedClientTypes)/sizeof(killedClientTypes[0]); ++curType) { + retval = redisAsyncCommand(ri->link->cc, + sentinelDiscardReplyCallback, ri, "%s KILL TYPE %s", + sentinelInstanceMapCommand(ri,"CLIENT"), + killedClientTypes[curType]); + if (retval == C_ERR) return retval; + ri->link->pending_commands++; + } retval = redisAsyncCommand(ri->link->cc, sentinelDiscardReplyCallback, ri, "%s", From 7059eceeb027d9ffb7cf9ae47201c8554d6e5010 Mon Sep 17 00:00:00 2001 From: "zhaozhao.zz" Date: Fri, 8 Nov 2019 19:06:51 +0800 Subject: [PATCH 09/34] expires & blocking: handle ready keys as call() --- src/blocked.c | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/blocked.c b/src/blocked.c index 14c2ff830..dea4cc57a 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -514,6 +514,9 @@ void handleClientsBlockedOnKeys(void) { * we can safely call signalKeyAsReady() against this key. */ dictDelete(rl->db->ready_keys,rl->key); + server.call_depth++; + updateCachedTime(0); + /* Serve clients blocked on list key. */ robj *o = lookupKeyWrite(rl->db,rl->key); @@ -530,6 +533,8 @@ void handleClientsBlockedOnKeys(void) { serveClientsBlockedOnKeyByModule(rl); } + server.call_depth++; + /* Free this item. */ decrRefCount(rl->key); zfree(rl); From 02f21113ab6c61d2c01544607b464eb501dbe8fa Mon Sep 17 00:00:00 2001 From: Oran Agra Date: Sun, 10 Nov 2019 09:21:19 +0200 Subject: [PATCH 10/34] fix leak in module api rdb test recently added more reads into that function, if a later read fails, i must either free what's already allocated, or return the pointer so that the free callback will release it. --- tests/modules/testrdb.c | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/tests/modules/testrdb.c b/tests/modules/testrdb.c index 8a262e8a7..7c04bb4ef 100644 --- a/tests/modules/testrdb.c +++ b/tests/modules/testrdb.c @@ -18,8 +18,11 @@ void *testrdb_type_load(RedisModuleIO *rdb, int encver) { RedisModuleString *str = RedisModule_LoadString(rdb); float f = RedisModule_LoadFloat(rdb); long double ld = RedisModule_LoadLongDouble(rdb); - if (RedisModule_IsIOError(rdb)) + if (RedisModule_IsIOError(rdb)) { + RedisModuleCtx *ctx = RedisModule_GetContextFromIO(rdb); + RedisModule_FreeString(ctx, str); return NULL; + } /* Using the values only after checking for io errors. */ assert(count==1); assert(encver==1); From 28c20b4ef95aa5f74b938681a6f78f6b92dec2a8 Mon Sep 17 00:00:00 2001 From: Oran Agra Date: Sun, 10 Nov 2019 09:04:39 +0200 Subject: [PATCH 11/34] rename RN_SetLRUOrLFU -> RM_SetLRU and RN_SetLFU - the API name was odd, separated to two apis one for LRU and one for LFU - the LRU idle time was in 1 second resolution, which might be ok for RDB and RESTORE, but i think modules may need higher resolution - adding tests for LFU and for handling maxmemory policy mismatch --- src/cluster.c | 2 +- src/module.c | 64 +++++++++++++++++++++------------ src/object.c | 4 +-- src/rdb.c | 2 +- src/redismodule.h | 12 ++++--- src/server.h | 2 +- tests/modules/misc.c | 66 +++++++++++++++++++++++++++++------ tests/unit/moduleapi/misc.tcl | 33 ++++++++++++++++-- 8 files changed, 141 insertions(+), 44 deletions(-) diff --git a/src/cluster.c b/src/cluster.c index a7d8a02c3..9e6ddb2c4 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -4966,7 +4966,7 @@ void restoreCommand(client *c) { if (!absttl) ttl+=mstime(); setExpire(c,c->db,c->argv[1],ttl); } - objectSetLRUOrLFU(obj,lfu_freq,lru_idle,lru_clock); + objectSetLRUOrLFU(obj,lfu_freq,lru_idle,lru_clock,1000); signalModifiedKey(c->db,c->argv[1]); addReply(c,shared.ok); server.dirty++; diff --git a/src/module.c b/src/module.c index ad34e7b64..cd79a5908 100644 --- a/src/module.c +++ b/src/module.c @@ -6736,35 +6736,53 @@ size_t moduleCount(void) { return dictSize(modules); } -/* Set the key LRU/LFU depending on server.maxmemory_policy. - * The lru_idle arg is idle time in seconds, and is only relevant if the - * eviction policy is LRU based. - * The lfu_freq arg is a logarithmic counter that provides an indication of - * the access frequencyonly (must be <= 255) and is only relevant if the - * eviction policy is LFU based. - * Either or both of them may be <0, in that case, nothing is set. */ -/* return value is an indication if the lru field was updated or not. */ -int RM_SetLRUOrLFU(RedisModuleKey *key, long long lfu_freq, long long lru_idle) { +/* Set the key last access time for LRU based eviction. not relevent if the + * servers's maxmemory policy is LFU based. Value is idle time in milliseconds. + * returns REDISMODULE_OK if the LRU was updated, REDISMODULE_ERR otherwise. */ +int RM_SetLRU(RedisModuleKey *key, mstime_t lru_idle) { if (!key->value) return REDISMODULE_ERR; - if (objectSetLRUOrLFU(key->value, lfu_freq, lru_idle, lru_idle>=0 ? LRU_CLOCK() : 0)) + if (objectSetLRUOrLFU(key->value, -1, lru_idle, lru_idle>=0 ? LRU_CLOCK() : 0, 1)) return REDISMODULE_OK; return REDISMODULE_ERR; } -/* Gets the key LRU or LFU (depending on the current eviction policy). - * One will be set to the appropiate return value, and the other will be set to -1. - * see RedisModule_SetLRUOrLFU for units and ranges. - * return value is an indication of success. */ -int RM_GetLRUOrLFU(RedisModuleKey *key, long long *lfu_freq, long long *lru_idle) { - *lru_idle = *lfu_freq = -1; +/* Gets the key last access time. + * Value is idletime in milliseconds or -1 if the server's eviction policy is + * LFU based. + * returns REDISMODULE_OK if when key is valid. */ +int RM_GetLRU(RedisModuleKey *key, mstime_t *lru_idle) { + *lru_idle = -1; if (!key->value) return REDISMODULE_ERR; - if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) { + if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) + return REDISMODULE_OK; + *lru_idle = estimateObjectIdleTime(key->value); + return REDISMODULE_OK; +} + +/* Set the key access frequency. only relevant if the server's maxmemory policy + * is LFU based. + * The frequency is a logarithmic counter that provides an indication of + * the access frequencyonly (must be <= 255). + * returns REDISMODULE_OK if the LFU was updated, REDISMODULE_ERR otherwise. */ +int RM_SetLFU(RedisModuleKey *key, long long lfu_freq) { + if (!key->value) + return REDISMODULE_ERR; + if (objectSetLRUOrLFU(key->value, lfu_freq, -1, 0, 1)) + return REDISMODULE_OK; + return REDISMODULE_ERR; +} + +/* Gets the key access frequency or -1 if the server's eviction policy is not + * LFU based. + * returns REDISMODULE_OK if when key is valid. */ +int RM_GetLFU(RedisModuleKey *key, long long *lfu_freq) { + *lfu_freq = -1; + if (!key->value) + return REDISMODULE_ERR; + if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) *lfu_freq = LFUDecrAndReturn(key->value); - } else { - *lru_idle = estimateObjectIdleTime(key->value)/1000; - } return REDISMODULE_OK; } @@ -6966,8 +6984,10 @@ void moduleRegisterCoreAPI(void) { REGISTER_API(GetClientInfoById); REGISTER_API(PublishMessage); REGISTER_API(SubscribeToServerEvent); - REGISTER_API(SetLRUOrLFU); - REGISTER_API(GetLRUOrLFU); + REGISTER_API(SetLRU); + REGISTER_API(GetLRU); + REGISTER_API(SetLFU); + REGISTER_API(GetLFU); REGISTER_API(BlockClientOnKeys); REGISTER_API(SignalKeyAsReady); REGISTER_API(GetBlockedClientReadyKey); diff --git a/src/object.c b/src/object.c index 5e9a99dec..c6d89bfa7 100644 --- a/src/object.c +++ b/src/object.c @@ -1210,7 +1210,7 @@ sds getMemoryDoctorReport(void) { * is MAXMEMORY_FLAG_LRU. * Either or both of them may be <0, in that case, nothing is set. */ int objectSetLRUOrLFU(robj *val, long long lfu_freq, long long lru_idle, - long long lru_clock) { + long long lru_clock, int lru_multiplier) { if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) { if (lfu_freq >= 0) { serverAssert(lfu_freq <= 255); @@ -1222,7 +1222,7 @@ int objectSetLRUOrLFU(robj *val, long long lfu_freq, long long lru_idle, * according to the LRU clock resolution this Redis * instance was compiled with (normally 1000 ms, so the * below statement will expand to lru_idle*1000/1000. */ - lru_idle = lru_idle*1000/LRU_CLOCK_RESOLUTION; + lru_idle = lru_idle*lru_multiplier/LRU_CLOCK_RESOLUTION; long lru_abs = lru_clock - lru_idle; /* Absolute access time. */ /* If the LRU field underflows (since LRU it is a wrapping * clock), the best we can do is to provide a large enough LRU diff --git a/src/rdb.c b/src/rdb.c index b569edfea..301a33642 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -2239,7 +2239,7 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) { if (expiretime != -1) setExpire(NULL,db,key,expiretime); /* Set usage information (for eviction). */ - objectSetLRUOrLFU(val,lfu_freq,lru_idle,lru_clock); + objectSetLRUOrLFU(val,lfu_freq,lru_idle,lru_clock,1000); /* Decrement the key refcount since dbAdd() will take its * own reference. */ diff --git a/src/redismodule.h b/src/redismodule.h index 728c7f584..e388d7439 100644 --- a/src/redismodule.h +++ b/src/redismodule.h @@ -583,8 +583,10 @@ int REDISMODULE_API_FUNC(RedisModule_InfoAddFieldDouble)(RedisModuleInfoCtx *ctx int REDISMODULE_API_FUNC(RedisModule_InfoAddFieldLongLong)(RedisModuleInfoCtx *ctx, char *field, long long value); int REDISMODULE_API_FUNC(RedisModule_InfoAddFieldULongLong)(RedisModuleInfoCtx *ctx, char *field, unsigned long long value); int REDISMODULE_API_FUNC(RedisModule_SubscribeToServerEvent)(RedisModuleCtx *ctx, RedisModuleEvent event, RedisModuleEventCallback callback); -int REDISMODULE_API_FUNC(RedisModule_SetLRUOrLFU)(RedisModuleKey *key, long long lfu_freq, long long lru_idle); -int REDISMODULE_API_FUNC(RedisModule_GetLRUOrLFU)(RedisModuleKey *key, long long *lfu_freq, long long *lru_idle); +int REDISMODULE_API_FUNC(RedisModule_SetLRU)(RedisModuleKey *key, mstime_t lru_idle); +int REDISMODULE_API_FUNC(RedisModule_GetLRU)(RedisModuleKey *key, mstime_t *lru_idle); +int REDISMODULE_API_FUNC(RedisModule_SetLFU)(RedisModuleKey *key, long long lfu_freq); +int REDISMODULE_API_FUNC(RedisModule_GetLFU)(RedisModuleKey *key, long long *lfu_freq); RedisModuleBlockedClient *REDISMODULE_API_FUNC(RedisModule_BlockClientOnKeys)(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(RedisModuleCtx*,void*), long long timeout_ms, RedisModuleString **keys, int numkeys, void *privdata); void REDISMODULE_API_FUNC(RedisModule_SignalKeyAsReady)(RedisModuleCtx *ctx, RedisModuleString *key); RedisModuleString *REDISMODULE_API_FUNC(RedisModule_GetBlockedClientReadyKey)(RedisModuleCtx *ctx); @@ -794,8 +796,10 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int REDISMODULE_GET_API(GetClientInfoById); REDISMODULE_GET_API(PublishMessage); REDISMODULE_GET_API(SubscribeToServerEvent); - REDISMODULE_GET_API(SetLRUOrLFU); - REDISMODULE_GET_API(GetLRUOrLFU); + REDISMODULE_GET_API(SetLRU); + REDISMODULE_GET_API(GetLRU); + REDISMODULE_GET_API(SetLFU); + REDISMODULE_GET_API(GetLFU); REDISMODULE_GET_API(BlockClientOnKeys); REDISMODULE_GET_API(SignalKeyAsReady); REDISMODULE_GET_API(GetBlockedClientReadyKey); diff --git a/src/server.h b/src/server.h index 8063dc101..c9ac3003e 100644 --- a/src/server.h +++ b/src/server.h @@ -2089,7 +2089,7 @@ robj *lookupKeyWriteWithFlags(redisDb *db, robj *key, int flags); robj *objectCommandLookup(client *c, robj *key); robj *objectCommandLookupOrReply(client *c, robj *key, robj *reply); int objectSetLRUOrLFU(robj *val, long long lfu_freq, long long lru_idle, - long long lru_clock); + long long lru_clock, int lru_multiplier); #define LOOKUP_NONE 0 #define LOOKUP_NOTOUCH (1<<0) void dbAdd(redisDb *db, robj *key, robj *val); diff --git a/tests/modules/misc.c b/tests/modules/misc.c index 7701a9c7c..06b5af620 100644 --- a/tests/modules/misc.c +++ b/tests/modules/misc.c @@ -68,16 +68,24 @@ int test_randomkey(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) return REDISMODULE_OK; } +RedisModuleKey *open_key_or_reply(RedisModuleCtx *ctx, RedisModuleString *keyname, int mode) { + RedisModuleKey *key = RedisModule_OpenKey(ctx, keyname, mode); + if (!key) { + RedisModule_ReplyWithError(ctx, "key not found"); + return NULL; + } + return key; +} + int test_getlru(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { if (argc<2) { RedisModule_WrongArity(ctx); return REDISMODULE_OK; } - RedisModuleString *keyname = argv[1]; - RedisModuleKey *key = RedisModule_OpenKey(ctx, keyname, REDISMODULE_READ|REDISMODULE_OPEN_KEY_NOTOUCH); - long long lru, lfu; - RedisModule_GetLRUOrLFU(key, &lfu, &lru); + RedisModuleKey *key = open_key_or_reply(ctx, argv[1], REDISMODULE_READ|REDISMODULE_OPEN_KEY_NOTOUCH); + mstime_t lru; + RedisModule_GetLRU(key, &lru); RedisModule_ReplyWithLongLong(ctx, lru); RedisModule_CloseKey(key); return REDISMODULE_OK; @@ -89,12 +97,46 @@ int test_setlru(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) RedisModule_WrongArity(ctx); return REDISMODULE_OK; } - RedisModuleString *keyname = argv[1]; - RedisModuleKey *key = RedisModule_OpenKey(ctx, keyname, REDISMODULE_WRITE|REDISMODULE_OPEN_KEY_NOTOUCH); - long long lru; - RedisModule_StringToLongLong(argv[2], &lru); - RedisModule_SetLRUOrLFU(key, -1, lru); - RedisModule_ReplyWithCString(ctx, "Ok"); + RedisModuleKey *key = open_key_or_reply(ctx, argv[1], REDISMODULE_READ|REDISMODULE_OPEN_KEY_NOTOUCH); + mstime_t lru; + if (RedisModule_StringToLongLong(argv[2], &lru) != REDISMODULE_OK) { + RedisModule_ReplyWithError(ctx, "invalid idle time"); + return REDISMODULE_OK; + } + int was_set = RedisModule_SetLRU(key, lru)==REDISMODULE_OK; + RedisModule_ReplyWithLongLong(ctx, was_set); + RedisModule_CloseKey(key); + return REDISMODULE_OK; +} + +int test_getlfu(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) +{ + if (argc<2) { + RedisModule_WrongArity(ctx); + return REDISMODULE_OK; + } + RedisModuleKey *key = open_key_or_reply(ctx, argv[1], REDISMODULE_READ|REDISMODULE_OPEN_KEY_NOTOUCH); + mstime_t lfu; + RedisModule_GetLFU(key, &lfu); + RedisModule_ReplyWithLongLong(ctx, lfu); + RedisModule_CloseKey(key); + return REDISMODULE_OK; +} + +int test_setlfu(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) +{ + if (argc<3) { + RedisModule_WrongArity(ctx); + return REDISMODULE_OK; + } + RedisModuleKey *key = open_key_or_reply(ctx, argv[1], REDISMODULE_READ|REDISMODULE_OPEN_KEY_NOTOUCH); + mstime_t lfu; + if (RedisModule_StringToLongLong(argv[2], &lfu) != REDISMODULE_OK) { + RedisModule_ReplyWithError(ctx, "invalid freq"); + return REDISMODULE_OK; + } + int was_set = RedisModule_SetLFU(key, lfu)==REDISMODULE_OK; + RedisModule_ReplyWithLongLong(ctx, was_set); RedisModule_CloseKey(key); return REDISMODULE_OK; } @@ -119,6 +161,10 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) return REDISMODULE_ERR; if (RedisModule_CreateCommand(ctx,"test.getlru", test_getlru,"",0,0,0) == REDISMODULE_ERR) return REDISMODULE_ERR; + if (RedisModule_CreateCommand(ctx,"test.setlfu", test_setlfu,"",0,0,0) == REDISMODULE_ERR) + return REDISMODULE_ERR; + if (RedisModule_CreateCommand(ctx,"test.getlfu", test_getlfu,"",0,0,0) == REDISMODULE_ERR) + return REDISMODULE_ERR; return REDISMODULE_OK; } diff --git a/tests/unit/moduleapi/misc.tcl b/tests/unit/moduleapi/misc.tcl index ebfa9631f..376bc2eed 100644 --- a/tests/unit/moduleapi/misc.tcl +++ b/tests/unit/moduleapi/misc.tcl @@ -26,13 +26,40 @@ start_server {tags {"modules"}} { } test {test modle lru api} { + r config set maxmemory-policy allkeys-lru r set x foo set lru [r test.getlru x] - assert { $lru <= 1 } - r test.setlru x 100 + assert { $lru <= 1000 } + set was_set [r test.setlru x 100000] + assert { $was_set == 1 } set idle [r object idletime x] assert { $idle >= 100 } set lru [r test.getlru x] - assert { $lru >= 100 } + assert { $lru >= 100000 } + r config set maxmemory-policy allkeys-lfu + set lru [r test.getlru x] + assert { $lru == -1 } + set was_set [r test.setlru x 100000] + assert { $was_set == 0 } } + r config set maxmemory-policy allkeys-lru + + test {test modle lfu api} { + r config set maxmemory-policy allkeys-lfu + r set x foo + set lfu [r test.getlfu x] + assert { $lfu >= 1 } + set was_set [r test.setlfu x 100] + assert { $was_set == 1 } + set freq [r object freq x] + assert { $freq <= 100 } + set lfu [r test.getlfu x] + assert { $lfu <= 100 } + r config set maxmemory-policy allkeys-lru + set lfu [r test.getlfu x] + assert { $lfu == -1 } + set was_set [r test.setlfu x 100] + assert { $was_set == 0 } + } + } From 11c6ce812aa32cf6a6011697cbfe8881ff9450fa Mon Sep 17 00:00:00 2001 From: "meir@redislabs.com" Date: Thu, 17 Oct 2019 15:37:01 +0300 Subject: [PATCH 12/34] Added scan implementation to module api. The implementation expose the following new functions: 1. RedisModule_CursorCreate - allow to create a new cursor object for keys scanning 2. RedisModule_CursorRestart - restart an existing cursor to restart the scan 3. RedisModule_CursorDestroy - destroy an existing cursor 4. RedisModule_Scan - scan keys The RedisModule_Scan function gets a cursor object, a callback and void* (used as user private data). The callback will be called for each key in the database proving the key name and the value as RedisModuleKey. --- runtest-moduleapi | 1 + src/module.c | 133 +++++++++++++++++++++++++++++++--- src/redismodule.h | 10 +++ tests/modules/Makefile | 3 +- tests/modules/scan.c | 62 ++++++++++++++++ tests/unit/moduleapi/scan.tcl | 18 +++++ 6 files changed, 215 insertions(+), 12 deletions(-) create mode 100644 tests/modules/scan.c create mode 100644 tests/unit/moduleapi/scan.tcl diff --git a/runtest-moduleapi b/runtest-moduleapi index e48535126..3eb6b21b2 100755 --- a/runtest-moduleapi +++ b/runtest-moduleapi @@ -22,4 +22,5 @@ $TCLSH tests/test_helper.tcl \ --single unit/moduleapi/hooks \ --single unit/moduleapi/misc \ --single unit/moduleapi/blockonkeys \ +--single unit/moduleapi/scan \ "${@}" diff --git a/src/module.c b/src/module.c index ad34e7b64..5758abbb6 100644 --- a/src/module.c +++ b/src/module.c @@ -1848,6 +1848,17 @@ int RM_SelectDb(RedisModuleCtx *ctx, int newid) { return (retval == C_OK) ? REDISMODULE_OK : REDISMODULE_ERR; } +static void initializeKey(RedisModuleKey *kp, RedisModuleCtx *ctx, robj *keyname, robj *value, int mode){ + kp->ctx = ctx; + kp->db = ctx->client->db; + kp->key = keyname; + incrRefCount(keyname); + kp->value = value; + kp->iter = NULL; + kp->mode = mode; + zsetKeyReset(kp); +} + /* Return an handle representing a Redis key, so that it is possible * to call other APIs with the key handle as argument to perform * operations on the key. @@ -1878,27 +1889,24 @@ void *RM_OpenKey(RedisModuleCtx *ctx, robj *keyname, int mode) { /* Setup the key handle. */ kp = zmalloc(sizeof(*kp)); - kp->ctx = ctx; - kp->db = ctx->client->db; - kp->key = keyname; - incrRefCount(keyname); - kp->value = value; - kp->iter = NULL; - kp->mode = mode; - zsetKeyReset(kp); + initializeKey(kp, ctx, keyname, value, mode); autoMemoryAdd(ctx,REDISMODULE_AM_KEY,kp); return (void*)kp; } -/* Close a key handle. */ -void RM_CloseKey(RedisModuleKey *key) { - if (key == NULL) return; +static void closeKeyInternal(RedisModuleKey *key) { int signal = SHOULD_SIGNAL_MODIFIED_KEYS(key->ctx); if ((key->mode & REDISMODULE_WRITE) && signal) signalModifiedKey(key->db,key->key); /* TODO: if (key->iter) RM_KeyIteratorStop(kp); */ RM_ZsetRangeStop(key); decrRefCount(key->key); +} + +/* Close a key handle. */ +void RM_CloseKey(RedisModuleKey *key) { + if (key == NULL) return; + closeKeyInternal(key); autoMemoryFreed(key->ctx,REDISMODULE_AM_KEY,key); zfree(key); } @@ -5891,6 +5899,105 @@ int RM_CommandFilterArgDelete(RedisModuleCommandFilterCtx *fctx, int pos) return REDISMODULE_OK; } +/** + * Callback for scan implementation. + * + * The keyname is owned by the caller and need to be retained if used after this function. + * + * The kp is the data and provide using the best efforts approach, in some cases it might + * not be available (in such case it will be set to NULL) and it is the user responsibility + * to handle it. + * + * The kp (if given) is owned by the caller and will be free when the callback returns + * + */ +typedef void (*RedisModuleScanCB)(void *privdata, RedisModuleString* keyname, RedisModuleKey* key); + +typedef struct { + RedisModuleCtx *ctx; + void* user_data; + RedisModuleScanCB fn; +} ScanCBData; + +typedef struct RedisModuleCursor{ + int cursor; +}RedisModuleCursor; + +void ScanCallback(void *privdata, const dictEntry *de) { + ScanCBData *data = privdata; + sds key = dictGetKey(de); + robj* val = dictGetVal(de); + RedisModuleString *keyname = createObject(OBJ_STRING,sdsdup(key)); + + /* Setup the key handle. */ + RedisModuleKey kp = {0}; + initializeKey(&kp, data->ctx, keyname, val, REDISMODULE_READ); + + data->fn(data->user_data, keyname, &kp); + + closeKeyInternal(&kp); + decrRefCount(keyname); +} + +/** + * Create a new cursor to scan keys. + */ +RedisModuleCursor* RM_CursorCreate() { + RedisModuleCursor* cursor = zmalloc(sizeof(*cursor)); + cursor->cursor = 0; + return cursor; +} + +/** + * Restart an existing cursor. The keys will be rescanned. + */ +void RM_CursorRestart(RedisModuleCursor* cursor) { + cursor->cursor = 0; +} + +/** + * Destroy the cursor struct. + */ +void RM_CursorDestroy(RedisModuleCursor* cursor) { + zfree(cursor); +} + +/** + * Scan api that allows module writer to scan all the keys and value in redis. + * The way it should be used: + * Cursor* c = RedisModule_CursorCreate(); + * while(RedisModule_Scan(ctx, c, callback, privateData)); + * RedisModule_CursorDestroy(c); + * + * It is also possible to use this api from another thread such that the GIL only have to + * be acquired durring the actuall call to RM_Scan: + * Cursor* c = RedisModule_CursorCreate(); + * RedisModule_ThreadSafeCtxLock(ctx); + * while(RedisModule_Scan(ctx, c, callback, privateData)){ + * RedisModule_ThreadSafeCtxUnlock(ctx); + * // do some background job + * RedisModule_ThreadSafeCtxLock(ctx); + * } + * RedisModule_CursorDestroy(c); + * + * The function will return 1 if there is more elements to scan and 0 otherwise. + * It is also possible to restart and existing cursor using RM_CursorRestart + */ +int RM_Scan(RedisModuleCtx *ctx, RedisModuleCursor* cursor, RedisModuleScanCB fn, void* privdata) { + if(cursor->cursor == -1){ + return 0; + } + int ret = 1; + ScanCBData data = { ctx, privdata, fn }; + cursor->cursor = dictScan(ctx->client->db->dict, cursor->cursor, ScanCallback, NULL, &data); + if (cursor->cursor == 0){ + cursor->cursor = -1; + ret = 0; + } + return ret; +} + + /* -------------------------------------------------------------------------- * Module fork API * -------------------------------------------------------------------------- */ @@ -6969,6 +7076,10 @@ void moduleRegisterCoreAPI(void) { REGISTER_API(SetLRUOrLFU); REGISTER_API(GetLRUOrLFU); REGISTER_API(BlockClientOnKeys); + REGISTER_API(Scan); + REGISTER_API(CursorCreate); + REGISTER_API(CursorDestroy); + REGISTER_API(CursorRestart); REGISTER_API(SignalKeyAsReady); REGISTER_API(GetBlockedClientReadyKey); } diff --git a/src/redismodule.h b/src/redismodule.h index 728c7f584..c74772d0f 100644 --- a/src/redismodule.h +++ b/src/redismodule.h @@ -392,6 +392,7 @@ typedef struct RedisModuleDictIter RedisModuleDictIter; typedef struct RedisModuleCommandFilterCtx RedisModuleCommandFilterCtx; typedef struct RedisModuleCommandFilter RedisModuleCommandFilter; typedef struct RedisModuleInfoCtx RedisModuleInfoCtx; +typedef struct RedisModuleCursor RedisModuleCursor; typedef int (*RedisModuleCmdFunc)(RedisModuleCtx *ctx, RedisModuleString **argv, int argc); typedef void (*RedisModuleDisconnectFunc)(RedisModuleCtx *ctx, RedisModuleBlockedClient *bc); @@ -409,6 +410,7 @@ typedef void (*RedisModuleTimerProc)(RedisModuleCtx *ctx, void *data); typedef void (*RedisModuleCommandFilterFunc) (RedisModuleCommandFilterCtx *filter); typedef void (*RedisModuleForkDoneHandler) (int exitcode, int bysignal, void *user_data); typedef void (*RedisModuleInfoFunc)(RedisModuleInfoCtx *ctx, int for_crash_report); +typedef void (*RedisModuleScanCB)(void *privdata, RedisModuleString* keyname, RedisModuleKey* key); #define REDISMODULE_TYPE_METHOD_VERSION 2 typedef struct RedisModuleTypeMethods { @@ -633,6 +635,10 @@ int REDISMODULE_API_FUNC(RedisModule_CommandFilterArgDelete)(RedisModuleCommandF int REDISMODULE_API_FUNC(RedisModule_Fork)(RedisModuleForkDoneHandler cb, void *user_data); int REDISMODULE_API_FUNC(RedisModule_ExitFromChild)(int retcode); int REDISMODULE_API_FUNC(RedisModule_KillForkChild)(int child_pid); +RedisModuleCursor* REDISMODULE_API_FUNC(RedisModule_CursorCreate)(); +void REDISMODULE_API_FUNC(RedisModule_CursorRestart)(RedisModuleCursor* cursor); +void REDISMODULE_API_FUNC(RedisModule_CursorDestroy)(RedisModuleCursor* cursor); +int REDISMODULE_API_FUNC(RedisModule_Scan)(RedisModuleCtx *ctx, RedisModuleCursor* cursor, RedisModuleScanCB fn, void* privdata); #endif #define RedisModule_IsAOFClient(id) ((id) == UINT64_MAX) @@ -842,6 +848,10 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int REDISMODULE_GET_API(Fork); REDISMODULE_GET_API(ExitFromChild); REDISMODULE_GET_API(KillForkChild); + REDISMODULE_GET_API(Scan); + REDISMODULE_GET_API(CursorCreate); + REDISMODULE_GET_API(CursorRestart); + REDISMODULE_GET_API(CursorDestroy); #endif if (RedisModule_IsModuleNameBusy && RedisModule_IsModuleNameBusy(name)) return REDISMODULE_ERR; diff --git a/tests/modules/Makefile b/tests/modules/Makefile index 9e27758a2..07c3cb829 100644 --- a/tests/modules/Makefile +++ b/tests/modules/Makefile @@ -19,7 +19,8 @@ TEST_MODULES = \ propagate.so \ misc.so \ hooks.so \ - blockonkeys.so + blockonkeys.so \ + scan.so .PHONY: all diff --git a/tests/modules/scan.c b/tests/modules/scan.c new file mode 100644 index 000000000..21071720a --- /dev/null +++ b/tests/modules/scan.c @@ -0,0 +1,62 @@ +#define REDISMODULE_EXPERIMENTAL_API +#include "redismodule.h" + +#include +#include +#include + +#define UNUSED(V) ((void) V) + +typedef struct scan_pd{ + size_t nkeys; + RedisModuleCtx *ctx; +} scan_pd; + +void scan_callback(void *privdata, RedisModuleString* keyname, RedisModuleKey* key){ + scan_pd* pd = privdata; + RedisModule_ReplyWithArray(pd->ctx, 2); + + RedisModule_ReplyWithString(pd->ctx, keyname); + if(key && RedisModule_KeyType(key) == REDISMODULE_KEYTYPE_STRING){ + size_t len; + char * data = RedisModule_StringDMA(key, &len, REDISMODULE_READ); + RedisModule_ReplyWithStringBuffer(pd->ctx, data, len); + }else{ + RedisModule_ReplyWithNull(pd->ctx); + } + pd->nkeys++; +} + +int scan_keys_values(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) +{ + scan_pd pd = { + .nkeys = 0, + .ctx = ctx, + }; + + RedisModule_ReplyWithArray(ctx, REDISMODULE_POSTPONED_ARRAY_LEN); + + RedisModuleCursor* cursor = RedisModule_CursorCreate(); + while(RedisModule_Scan(ctx, cursor, scan_callback, &pd)); + RedisModule_CursorDestroy(cursor); + + RedisModule_ReplySetArrayLength(ctx, pd.nkeys); + return 0; +} + +int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + UNUSED(argv); + UNUSED(argc); + if (RedisModule_Init(ctx, "scan", 1, REDISMODULE_APIVER_1)== REDISMODULE_ERR) + return REDISMODULE_ERR; + + if (RedisModule_CreateCommand(ctx, "scan.scankeysvalues", scan_keys_values, "", 0, 0, 0) == REDISMODULE_ERR) + return REDISMODULE_ERR; + + return REDISMODULE_OK; +} + + + + + diff --git a/tests/unit/moduleapi/scan.tcl b/tests/unit/moduleapi/scan.tcl new file mode 100644 index 000000000..5a77e8195 --- /dev/null +++ b/tests/unit/moduleapi/scan.tcl @@ -0,0 +1,18 @@ +set testmodule [file normalize tests/modules/scan.so] + +proc count_log_message {pattern} { + set result [exec grep -c $pattern < [srv 0 stdout]] +} + +start_server {tags {"modules"}} { + r module load $testmodule + + test {Module scan} { + # the module create a scan command which also return values + r set x 1 + r set y 2 + r set z 3 + lsort [r scan.scankeysvalues] + } {{x 1} {y 2} {z 3}} + +} \ No newline at end of file From 0f8692b4646013b7d98d4b21f86da0686546d43a Mon Sep 17 00:00:00 2001 From: Oran Agra Date: Mon, 11 Nov 2019 13:30:37 +0200 Subject: [PATCH 13/34] Add RM_ScanKey to scan hash, set, zset, changes to RM_Scan API - Adding RM_ScanKey - Adding tests for RM_ScanKey - Refactoring RM_Scan API Changes in RM_Scan - cleanup in docs and coding convention - Moving out of experimantal Api - Adding ctx to scan callback - Dont use cursor of -1 as an indication of done (can be a valid cursor) - Set errno when returning 0 for various reasons - Rename Cursor to ScanCursor - Test filters key that are not strings, and opens a key if NULL --- src/module.c | 255 ++++++++++++++++++++++++++-------- src/redismodule.h | 23 +-- tests/modules/scan.c | 107 ++++++++++---- tests/unit/moduleapi/scan.tcl | 45 ++++-- 4 files changed, 323 insertions(+), 107 deletions(-) diff --git a/src/module.c b/src/module.c index 5758abbb6..b92a9e692 100644 --- a/src/module.c +++ b/src/module.c @@ -1848,7 +1848,8 @@ int RM_SelectDb(RedisModuleCtx *ctx, int newid) { return (retval == C_OK) ? REDISMODULE_OK : REDISMODULE_ERR; } -static void initializeKey(RedisModuleKey *kp, RedisModuleCtx *ctx, robj *keyname, robj *value, int mode){ +/* Initialize a RedisModuleKey struct */ +static void moduleInitKey(RedisModuleKey *kp, RedisModuleCtx *ctx, robj *keyname, robj *value, int mode){ kp->ctx = ctx; kp->db = ctx->client->db; kp->key = keyname; @@ -1889,12 +1890,13 @@ void *RM_OpenKey(RedisModuleCtx *ctx, robj *keyname, int mode) { /* Setup the key handle. */ kp = zmalloc(sizeof(*kp)); - initializeKey(kp, ctx, keyname, value, mode); + moduleInitKey(kp, ctx, keyname, value, mode); autoMemoryAdd(ctx,REDISMODULE_AM_KEY,kp); return (void*)kp; } -static void closeKeyInternal(RedisModuleKey *key) { +/* Destroy a RedisModuleKey struct (freeing is the responsibility of the caller). */ +static void moduleCloseKey(RedisModuleKey *key) { int signal = SHOULD_SIGNAL_MODIFIED_KEYS(key->ctx); if ((key->mode & REDISMODULE_WRITE) && signal) signalModifiedKey(key->db,key->key); @@ -1906,7 +1908,7 @@ static void closeKeyInternal(RedisModuleKey *key) { /* Close a key handle. */ void RM_CloseKey(RedisModuleKey *key) { if (key == NULL) return; - closeKeyInternal(key); + moduleCloseKey(key); autoMemoryFreed(key->ctx,REDISMODULE_AM_KEY,key); zfree(key); } @@ -5899,31 +5901,23 @@ int RM_CommandFilterArgDelete(RedisModuleCommandFilterCtx *fctx, int pos) return REDISMODULE_OK; } -/** - * Callback for scan implementation. - * - * The keyname is owned by the caller and need to be retained if used after this function. - * - * The kp is the data and provide using the best efforts approach, in some cases it might - * not be available (in such case it will be set to NULL) and it is the user responsibility - * to handle it. - * - * The kp (if given) is owned by the caller and will be free when the callback returns - * - */ -typedef void (*RedisModuleScanCB)(void *privdata, RedisModuleString* keyname, RedisModuleKey* key); +/* -------------------------------------------------------------------------- + * Scanning keyspace and hashes + * -------------------------------------------------------------------------- */ +typedef void (*RedisModuleScanCB)(RedisModuleCtx *ctx, RedisModuleString *keyname, RedisModuleKey *key, void *privdata); typedef struct { RedisModuleCtx *ctx; void* user_data; RedisModuleScanCB fn; } ScanCBData; -typedef struct RedisModuleCursor{ +typedef struct RedisModuleScanCursor{ int cursor; -}RedisModuleCursor; + int done; +}RedisModuleScanCursor; -void ScanCallback(void *privdata, const dictEntry *de) { +static void moduleScanCallback(void *privdata, const dictEntry *de) { ScanCBData *data = privdata; sds key = dictGetKey(de); robj* val = dictGetVal(de); @@ -5931,69 +5925,211 @@ void ScanCallback(void *privdata, const dictEntry *de) { /* Setup the key handle. */ RedisModuleKey kp = {0}; - initializeKey(&kp, data->ctx, keyname, val, REDISMODULE_READ); + moduleInitKey(&kp, data->ctx, keyname, val, REDISMODULE_READ); - data->fn(data->user_data, keyname, &kp); + data->fn(data->ctx, keyname, &kp, data->user_data); - closeKeyInternal(&kp); + moduleCloseKey(&kp); decrRefCount(keyname); } -/** - * Create a new cursor to scan keys. - */ -RedisModuleCursor* RM_CursorCreate() { - RedisModuleCursor* cursor = zmalloc(sizeof(*cursor)); +/* Create a new cursor to be used with RedisModule_Scan */ +RedisModuleScanCursor *RM_ScanCursorCreate() { + RedisModuleScanCursor* cursor = zmalloc(sizeof(*cursor)); cursor->cursor = 0; + cursor->done = 0; return cursor; } -/** - * Restart an existing cursor. The keys will be rescanned. - */ -void RM_CursorRestart(RedisModuleCursor* cursor) { +/* Restart an existing cursor. The keys will be rescanned. */ +void RM_ScanCursorRestart(RedisModuleScanCursor *cursor) { cursor->cursor = 0; + cursor->done = 0; } -/** - * Destroy the cursor struct. - */ -void RM_CursorDestroy(RedisModuleCursor* cursor) { +/* Destroy the cursor struct. */ +void RM_ScanCursorDestroy(RedisModuleScanCursor *cursor) { zfree(cursor); } -/** - * Scan api that allows module writer to scan all the keys and value in redis. +/* Scan api that allows a module to scan all the keys and value in the selected db. + * + * Callback for scan implementation. + * void scan_callback(RedisModuleCtx *ctx, RedisModuleString *keyname, RedisModuleKey *key, void *privdata); + * - ctx - the redis module context provided to for the scan. + * - keyname - owned by the caller and need to be retained if used after this function. + * - key - holds info on the key and value, it is provided as best effort, in some cases it might + * be NULL, in which case the user should (can) use RedisModule_OpenKey (and CloseKey too). + * when it is provided, it is owned by the caller and will be free when the callback returns. + * - privdata - the user data provided to RedisModule_Scan. + * * The way it should be used: - * Cursor* c = RedisModule_CursorCreate(); + * RedisModuleCursor *c = RedisModule_ScanCursorCreate(); * while(RedisModule_Scan(ctx, c, callback, privateData)); - * RedisModule_CursorDestroy(c); + * RedisModule_ScanCursorDestroy(c); * - * It is also possible to use this api from another thread such that the GIL only have to - * be acquired durring the actuall call to RM_Scan: - * Cursor* c = RedisModule_CursorCreate(); - * RedisModule_ThreadSafeCtxLock(ctx); + * It is also possible to use this API from another thread while the lock is acquired durring + * the actuall call to RM_Scan: + * RedisModuleCursor *c = RedisModule_ScanCursorCreate(); + * RedisModule_ThreadSafeContextLock(ctx); * while(RedisModule_Scan(ctx, c, callback, privateData)){ - * RedisModule_ThreadSafeCtxUnlock(ctx); + * RedisModule_ThreadSafeContextUnlock(ctx); * // do some background job - * RedisModule_ThreadSafeCtxLock(ctx); + * RedisModule_ThreadSafeContextLock(ctx); * } - * RedisModule_CursorDestroy(c); + * RedisModule_ScanCursorDestroy(c); * - * The function will return 1 if there is more elements to scan and 0 otherwise. - * It is also possible to restart and existing cursor using RM_CursorRestart - */ -int RM_Scan(RedisModuleCtx *ctx, RedisModuleCursor* cursor, RedisModuleScanCB fn, void* privdata) { - if(cursor->cursor == -1){ + * The function will return 1 if there are more elements to scan and 0 otherwise, + * possibly setting errno if the call failed. + * It is also possible to restart and existing cursor using RM_CursorRestart. */ +int RM_Scan(RedisModuleCtx *ctx, RedisModuleScanCursor *cursor, RedisModuleScanCB fn, void *privdata) { + if (cursor->done) { + errno = ENOENT; return 0; } int ret = 1; ScanCBData data = { ctx, privdata, fn }; - cursor->cursor = dictScan(ctx->client->db->dict, cursor->cursor, ScanCallback, NULL, &data); - if (cursor->cursor == 0){ - cursor->cursor = -1; + cursor->cursor = dictScan(ctx->client->db->dict, cursor->cursor, moduleScanCallback, NULL, &data); + if (cursor->cursor == 0) { + cursor->done = 1; ret = 0; } + errno = 0; + return ret; +} + +typedef void (*RedisModuleScanKeyCB)(RedisModuleKey *key, RedisModuleString *field, RedisModuleString *value, void *privdata); +typedef struct { + RedisModuleKey *key; + void* user_data; + RedisModuleScanKeyCB fn; +} ScanKeyCBData; + +static void moduleScanKeyCallback(void *privdata, const dictEntry *de) { + ScanKeyCBData *data = privdata; + sds key = dictGetKey(de); + robj *o = data->key->value; + robj *field = createStringObject(key, sdslen(key)); + robj *value = NULL; + if (o->type == OBJ_SET) { + value = NULL; + } else if (o->type == OBJ_HASH) { + sds val = dictGetVal(de); + value = createStringObject(val, sdslen(val)); + } else if (o->type == OBJ_ZSET) { + double *val = (double*)dictGetVal(de); + value = createStringObjectFromLongDouble(*val, 0); + } + + data->fn(data->key, field, value, data->user_data); + decrRefCount(field); + if (value) decrRefCount(value); +} + +/* Scan api that allows a module to scan the elements in a hash, set or sorted set key + * + * Callback for scan implementation. + * void scan_callback(RedisModuleKey *key, RedisModuleString* field, RedisModuleString* value, void *privdata); + * - key - the redis key context provided to for the scan. + * - field - field name, owned by the caller and need to be retained if used + * after this function. + * - value - value string or NULL for set type, owned by the caller and need to + * be retained if used after this function. + * - privdata - the user data provided to RedisModule_ScanKey. + * + * The way it should be used: + * RedisModuleCursor *c = RedisModule_ScanCursorCreate(); + * RedisModuleKey *key = RedisModule_OpenKey(...) + * while(RedisModule_ScanKey(key, c, callback, privateData)); + * RedisModule_CloseKey(key); + * RedisModule_ScanCursorDestroy(c); + * + * It is also possible to use this API from another thread while the lock is acquired durring + * the actuall call to RM_Scan, and re-opening the key each time: + * RedisModuleCursor *c = RedisModule_ScanCursorCreate(); + * RedisModule_ThreadSafeContextLock(ctx); + * RedisModuleKey *key = RedisModule_OpenKey(...) + * while(RedisModule_ScanKey(ctx, c, callback, privateData)){ + * RedisModule_CloseKey(key); + * RedisModule_ThreadSafeContextUnlock(ctx); + * // do some background job + * RedisModule_ThreadSafeContextLock(ctx); + * RedisModuleKey *key = RedisModule_OpenKey(...) + * } + * RedisModule_CloseKey(key); + * RedisModule_ScanCursorDestroy(c); + * + * The function will return 1 if there are more elements to scan and 0 otherwise, + * possibly setting errno if the call failed. + * It is also possible to restart and existing cursor using RM_CursorRestart. */ +int RM_ScanKey(RedisModuleKey *key, RedisModuleScanCursor *cursor, RedisModuleScanKeyCB fn, void *privdata) { + if (key == NULL || key->value == NULL) { + errno = EINVAL; + return 0; + } + dict *ht = NULL; + robj *o = key->value; + if (o->type == OBJ_SET) { + if (o->encoding == OBJ_ENCODING_HT) + ht = o->ptr; + } else if (o->type == OBJ_HASH) { + if (o->encoding == OBJ_ENCODING_HT) + ht = o->ptr; + } else if (o->type == OBJ_ZSET) { + if (o->encoding == OBJ_ENCODING_SKIPLIST) + ht = ((zset *)o->ptr)->dict; + } else { + errno = EINVAL; + return 0; + } + if (cursor->done) { + errno = ENOENT; + return 0; + } + int ret = 1; + if (ht) { + ScanKeyCBData data = { key, privdata, fn }; + cursor->cursor = dictScan(ht, cursor->cursor, moduleScanKeyCallback, NULL, &data); + if (cursor->cursor == 0) { + cursor->done = 1; + ret = 0; + } + } else if (o->type == OBJ_SET && o->encoding == OBJ_ENCODING_INTSET) { + int pos = 0; + int64_t ll; + while(intsetGet(o->ptr,pos++,&ll)) { + robj *field = createStringObjectFromLongLong(ll); + fn(key, field, NULL, privdata); + decrRefCount(field); + } + cursor->cursor = 1; + cursor->done = 1; + ret = 0; + } else if (o->type == OBJ_HASH || o->type == OBJ_ZSET) { + unsigned char *p = ziplistIndex(o->ptr,0); + unsigned char *vstr; + unsigned int vlen; + long long vll; + while(p) { + ziplistGet(p,&vstr,&vlen,&vll); + robj *field = (vstr != NULL) ? + createStringObject((char*)vstr,vlen) : + createStringObjectFromLongLong(vll); + p = ziplistNext(o->ptr,p); + ziplistGet(p,&vstr,&vlen,&vll); + robj *value = (vstr != NULL) ? + createStringObject((char*)vstr,vlen) : + createStringObjectFromLongLong(vll); + fn(key, field, value, privdata); + p = ziplistNext(o->ptr,p); + decrRefCount(field); + decrRefCount(value); + } + cursor->cursor = 1; + cursor->done = 1; + ret = 0; + } + errno = 0; return ret; } @@ -7076,10 +7212,11 @@ void moduleRegisterCoreAPI(void) { REGISTER_API(SetLRUOrLFU); REGISTER_API(GetLRUOrLFU); REGISTER_API(BlockClientOnKeys); - REGISTER_API(Scan); - REGISTER_API(CursorCreate); - REGISTER_API(CursorDestroy); - REGISTER_API(CursorRestart); REGISTER_API(SignalKeyAsReady); REGISTER_API(GetBlockedClientReadyKey); + REGISTER_API(ScanCursorCreate); + REGISTER_API(ScanCursorDestroy); + REGISTER_API(ScanCursorRestart); + REGISTER_API(Scan); + REGISTER_API(ScanKey); } diff --git a/src/redismodule.h b/src/redismodule.h index c74772d0f..07e1452e1 100644 --- a/src/redismodule.h +++ b/src/redismodule.h @@ -392,7 +392,7 @@ typedef struct RedisModuleDictIter RedisModuleDictIter; typedef struct RedisModuleCommandFilterCtx RedisModuleCommandFilterCtx; typedef struct RedisModuleCommandFilter RedisModuleCommandFilter; typedef struct RedisModuleInfoCtx RedisModuleInfoCtx; -typedef struct RedisModuleCursor RedisModuleCursor; +typedef struct RedisModuleScanCursor RedisModuleScanCursor; typedef int (*RedisModuleCmdFunc)(RedisModuleCtx *ctx, RedisModuleString **argv, int argc); typedef void (*RedisModuleDisconnectFunc)(RedisModuleCtx *ctx, RedisModuleBlockedClient *bc); @@ -410,7 +410,8 @@ typedef void (*RedisModuleTimerProc)(RedisModuleCtx *ctx, void *data); typedef void (*RedisModuleCommandFilterFunc) (RedisModuleCommandFilterCtx *filter); typedef void (*RedisModuleForkDoneHandler) (int exitcode, int bysignal, void *user_data); typedef void (*RedisModuleInfoFunc)(RedisModuleInfoCtx *ctx, int for_crash_report); -typedef void (*RedisModuleScanCB)(void *privdata, RedisModuleString* keyname, RedisModuleKey* key); +typedef void (*RedisModuleScanCB)(RedisModuleCtx *ctx, RedisModuleString *keyname, RedisModuleKey *key, void *privdata); +typedef void (*RedisModuleScanKeyCB)(RedisModuleKey *key, RedisModuleString *field, RedisModuleString *value, void *privdata); #define REDISMODULE_TYPE_METHOD_VERSION 2 typedef struct RedisModuleTypeMethods { @@ -590,6 +591,11 @@ int REDISMODULE_API_FUNC(RedisModule_GetLRUOrLFU)(RedisModuleKey *key, long long RedisModuleBlockedClient *REDISMODULE_API_FUNC(RedisModule_BlockClientOnKeys)(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(RedisModuleCtx*,void*), long long timeout_ms, RedisModuleString **keys, int numkeys, void *privdata); void REDISMODULE_API_FUNC(RedisModule_SignalKeyAsReady)(RedisModuleCtx *ctx, RedisModuleString *key); RedisModuleString *REDISMODULE_API_FUNC(RedisModule_GetBlockedClientReadyKey)(RedisModuleCtx *ctx); +RedisModuleScanCursor *REDISMODULE_API_FUNC(RedisModule_ScanCursorCreate)(); +void REDISMODULE_API_FUNC(RedisModule_ScanCursorRestart)(RedisModuleScanCursor *cursor); +void REDISMODULE_API_FUNC(RedisModule_ScanCursorDestroy)(RedisModuleScanCursor *cursor); +int REDISMODULE_API_FUNC(RedisModule_Scan)(RedisModuleCtx *ctx, RedisModuleScanCursor *cursor, RedisModuleScanCB fn, void *privdata); +int REDISMODULE_API_FUNC(RedisModule_ScanKey)(RedisModuleKey *key, RedisModuleScanCursor *cursor, RedisModuleScanKeyCB fn, void *privdata); /* Experimental APIs */ #ifdef REDISMODULE_EXPERIMENTAL_API @@ -635,10 +641,6 @@ int REDISMODULE_API_FUNC(RedisModule_CommandFilterArgDelete)(RedisModuleCommandF int REDISMODULE_API_FUNC(RedisModule_Fork)(RedisModuleForkDoneHandler cb, void *user_data); int REDISMODULE_API_FUNC(RedisModule_ExitFromChild)(int retcode); int REDISMODULE_API_FUNC(RedisModule_KillForkChild)(int child_pid); -RedisModuleCursor* REDISMODULE_API_FUNC(RedisModule_CursorCreate)(); -void REDISMODULE_API_FUNC(RedisModule_CursorRestart)(RedisModuleCursor* cursor); -void REDISMODULE_API_FUNC(RedisModule_CursorDestroy)(RedisModuleCursor* cursor); -int REDISMODULE_API_FUNC(RedisModule_Scan)(RedisModuleCtx *ctx, RedisModuleCursor* cursor, RedisModuleScanCB fn, void* privdata); #endif #define RedisModule_IsAOFClient(id) ((id) == UINT64_MAX) @@ -805,6 +807,11 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int REDISMODULE_GET_API(BlockClientOnKeys); REDISMODULE_GET_API(SignalKeyAsReady); REDISMODULE_GET_API(GetBlockedClientReadyKey); + REDISMODULE_GET_API(ScanCursorCreate); + REDISMODULE_GET_API(ScanCursorRestart); + REDISMODULE_GET_API(ScanCursorDestroy); + REDISMODULE_GET_API(Scan); + REDISMODULE_GET_API(ScanKey); #ifdef REDISMODULE_EXPERIMENTAL_API REDISMODULE_GET_API(GetThreadSafeContext); @@ -848,10 +855,6 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int REDISMODULE_GET_API(Fork); REDISMODULE_GET_API(ExitFromChild); REDISMODULE_GET_API(KillForkChild); - REDISMODULE_GET_API(Scan); - REDISMODULE_GET_API(CursorCreate); - REDISMODULE_GET_API(CursorRestart); - REDISMODULE_GET_API(CursorDestroy); #endif if (RedisModule_IsModuleNameBusy && RedisModule_IsModuleNameBusy(name)) return REDISMODULE_ERR; diff --git a/tests/modules/scan.c b/tests/modules/scan.c index 21071720a..afede244b 100644 --- a/tests/modules/scan.c +++ b/tests/modules/scan.c @@ -1,62 +1,109 @@ -#define REDISMODULE_EXPERIMENTAL_API #include "redismodule.h" #include #include #include -#define UNUSED(V) ((void) V) +typedef struct { + size_t nkeys; +} scan_strings_pd; -typedef struct scan_pd{ - size_t nkeys; - RedisModuleCtx *ctx; -} scan_pd; +void scan_strings_callback(RedisModuleCtx *ctx, RedisModuleString* keyname, RedisModuleKey* key, void *privdata) { + scan_strings_pd* pd = privdata; + int was_opened = 0; + if (!key) { + key = RedisModule_OpenKey(ctx, keyname, REDISMODULE_READ); + was_opened = 1; + } -void scan_callback(void *privdata, RedisModuleString* keyname, RedisModuleKey* key){ - scan_pd* pd = privdata; - RedisModule_ReplyWithArray(pd->ctx, 2); - - RedisModule_ReplyWithString(pd->ctx, keyname); - if(key && RedisModule_KeyType(key) == REDISMODULE_KEYTYPE_STRING){ + if (RedisModule_KeyType(key) == REDISMODULE_KEYTYPE_STRING) { size_t len; char * data = RedisModule_StringDMA(key, &len, REDISMODULE_READ); - RedisModule_ReplyWithStringBuffer(pd->ctx, data, len); - }else{ - RedisModule_ReplyWithNull(pd->ctx); + RedisModule_ReplyWithArray(ctx, 2); + RedisModule_ReplyWithString(ctx, keyname); + RedisModule_ReplyWithStringBuffer(ctx, data, len); + pd->nkeys++; } - pd->nkeys++; + if (was_opened) + RedisModule_CloseKey(key); } -int scan_keys_values(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) +int scan_strings(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { - scan_pd pd = { - .nkeys = 0, - .ctx = ctx, + REDISMODULE_NOT_USED(argv); + REDISMODULE_NOT_USED(argc); + scan_strings_pd pd = { + .nkeys = 0, }; RedisModule_ReplyWithArray(ctx, REDISMODULE_POSTPONED_ARRAY_LEN); - RedisModuleCursor* cursor = RedisModule_CursorCreate(); - while(RedisModule_Scan(ctx, cursor, scan_callback, &pd)); - RedisModule_CursorDestroy(cursor); + RedisModuleScanCursor* cursor = RedisModule_ScanCursorCreate(); + while(RedisModule_Scan(ctx, cursor, scan_strings_callback, &pd)); + RedisModule_ScanCursorDestroy(cursor); RedisModule_ReplySetArrayLength(ctx, pd.nkeys); - return 0; + return REDISMODULE_OK; +} + +typedef struct { + RedisModuleCtx *ctx; + size_t nreplies; +} scan_key_pd; + +void scan_key_callback(RedisModuleKey *key, RedisModuleString* field, RedisModuleString* value, void *privdata) { + REDISMODULE_NOT_USED(key); + scan_key_pd* pd = privdata; + RedisModule_ReplyWithArray(pd->ctx, 2); + RedisModule_ReplyWithString(pd->ctx, field); + if (value) + RedisModule_ReplyWithString(pd->ctx, value); + else + RedisModule_ReplyWithNull(pd->ctx); + pd->nreplies++; +} + +int scan_key(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) +{ + if (argc != 2) { + RedisModule_WrongArity(ctx); + return REDISMODULE_OK; + } + scan_key_pd pd = { + .ctx = ctx, + .nreplies = 0, + }; + + RedisModuleKey *key = RedisModule_OpenKey(ctx, argv[1], REDISMODULE_READ); + if (!key) { + RedisModule_ReplyWithError(ctx, "not found"); + return REDISMODULE_OK; + } + + RedisModule_ReplyWithArray(ctx, REDISMODULE_POSTPONED_ARRAY_LEN); + + RedisModuleScanCursor* cursor = RedisModule_ScanCursorCreate(); + while(RedisModule_ScanKey(key, cursor, scan_key_callback, &pd)); + RedisModule_ScanCursorDestroy(cursor); + + RedisModule_ReplySetArrayLength(ctx, pd.nreplies); + RedisModule_CloseKey(key); + return REDISMODULE_OK; } int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { - UNUSED(argv); - UNUSED(argc); + REDISMODULE_NOT_USED(argv); + REDISMODULE_NOT_USED(argc); if (RedisModule_Init(ctx, "scan", 1, REDISMODULE_APIVER_1)== REDISMODULE_ERR) return REDISMODULE_ERR; - if (RedisModule_CreateCommand(ctx, "scan.scankeysvalues", scan_keys_values, "", 0, 0, 0) == REDISMODULE_ERR) + if (RedisModule_CreateCommand(ctx, "scan.scan_strings", scan_strings, "", 0, 0, 0) == REDISMODULE_ERR) + return REDISMODULE_ERR; + + if (RedisModule_CreateCommand(ctx, "scan.scan_key", scan_key, "", 0, 0, 0) == REDISMODULE_ERR) return REDISMODULE_ERR; return REDISMODULE_OK; } - - - diff --git a/tests/unit/moduleapi/scan.tcl b/tests/unit/moduleapi/scan.tcl index 5a77e8195..de1672e0a 100644 --- a/tests/unit/moduleapi/scan.tcl +++ b/tests/unit/moduleapi/scan.tcl @@ -1,18 +1,47 @@ set testmodule [file normalize tests/modules/scan.so] -proc count_log_message {pattern} { - set result [exec grep -c $pattern < [srv 0 stdout]] -} - start_server {tags {"modules"}} { r module load $testmodule - test {Module scan} { - # the module create a scan command which also return values + test {Module scan keyspace} { + # the module create a scan command with filtering which also return values r set x 1 r set y 2 r set z 3 - lsort [r scan.scankeysvalues] + r hset h f v + lsort [r scan.scan_strings] } {{x 1} {y 2} {z 3}} -} \ No newline at end of file + test {Module scan hash ziplist} { + r hmset hh f1 v1 f2 v2 + lsort [r scan.scan_key hh] + } {{f1 v1} {f2 v2}} + + test {Module scan hash dict} { + r config set hash-max-ziplist-entries 2 + r hmset hh f3 v3 + lsort [r scan.scan_key hh] + } {{f1 v1} {f2 v2} {f3 v3}} + + test {Module scan zset ziplist} { + r zadd zz 1 f1 2 f2 + lsort [r scan.scan_key zz] + } {{f1 1} {f2 2}} + + test {Module scan zset dict} { + r config set zset-max-ziplist-entries 2 + r zadd zz 3 f3 + lsort [r scan.scan_key zz] + } {{f1 1} {f2 2} {f3 3}} + + test {Module scan set intset} { + r sadd ss 1 2 + lsort [r scan.scan_key ss] + } {{1 {}} {2 {}}} + + test {Module scan set dict} { + r config set set-max-intset-entries 2 + r sadd ss 3 + lsort [r scan.scan_key ss] + } {{1 {}} {2 {}} {3 {}}} +} From 253d9d6d12c5a19ae8faa7632068772845b4a552 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=96=9C=E6=AC=A2=E5=85=B0=E8=8A=B1=E5=B1=B1=E4=B8=98?= Date: Wed, 13 Nov 2019 10:14:45 +0800 Subject: [PATCH 14/34] Update adlist.h Update listGetFree keep format consistent --- src/adlist.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/adlist.h b/src/adlist.h index c954fac87..28b9016ce 100644 --- a/src/adlist.h +++ b/src/adlist.h @@ -66,7 +66,7 @@ typedef struct list { #define listSetMatchMethod(l,m) ((l)->match = (m)) #define listGetDupMethod(l) ((l)->dup) -#define listGetFree(l) ((l)->free) +#define listGetFreeMethod(l) ((l)->free) #define listGetMatchMethod(l) ((l)->match) /* Prototypes */ From 4a12047c61570ec3a4dbf9513b0881b41b224399 Mon Sep 17 00:00:00 2001 From: Guy Benoish Date: Wed, 13 Nov 2019 16:43:07 +0530 Subject: [PATCH 15/34] XADD with ID 0-0 stores an empty key Calling XADD with 0-0 or 0 would result in creating an empty key and storing it in the database. Even worse, because XADD will reply with error the action will not be replicated, creating a master-replica inconsistency --- src/t_stream.c | 8 ++++++++ tests/unit/type/stream.tcl | 6 ++++++ 2 files changed, 14 insertions(+) diff --git a/src/t_stream.c b/src/t_stream.c index 58b59f521..9bf87831f 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -1220,6 +1220,14 @@ void xaddCommand(client *c) { return; } + /* Return ASAP if minimal ID (0-0) was given so we avoid possibly creating + * a new stream and have streamAppendItem fail, leaving an empty key in the + * database. */ + if (id_given && id.ms == 0 && id.seq == 0) { + addReplyError(c,"The ID specified in XADD must be greater than 0-0"); + return; + } + /* Lookup the stream at key. */ robj *o; stream *s; diff --git a/tests/unit/type/stream.tcl b/tests/unit/type/stream.tcl index aa9c5f3a9..a4431c654 100644 --- a/tests/unit/type/stream.tcl +++ b/tests/unit/type/stream.tcl @@ -123,6 +123,12 @@ start_server { assert {[r xlen mystream] == $j} } + test {XADD with ID 0-0} { + r DEL mystream + catch {r XADD mystream 0-0 k v} err + assert {[r EXISTS mystream] == 0} + } + test {XRANGE COUNT works as expected} { assert {[llength [r xrange mystream - + COUNT 10]] == 10} } From 9c76875f413190a547244e72a3f9e9bbeb6811e9 Mon Sep 17 00:00:00 2001 From: Yossi Gottlieb Date: Fri, 23 Jun 2017 10:29:49 +0300 Subject: [PATCH 16/34] Add RM_ModuleTypeReplaceValue. This is a light-weight replace function, useful for use cases such as realloc()ing an existing value, etc. Using RM_ModuleTypeSetValue() in such cases is wasteful and complex as it attempts to delete the old value, call its destructor, etc. --- src/module.c | 29 ++++++++++++++++++++++++++++- src/redismodule.h | 2 ++ 2 files changed, 30 insertions(+), 1 deletion(-) diff --git a/src/module.c b/src/module.c index ad34e7b64..0f22178c9 100644 --- a/src/module.c +++ b/src/module.c @@ -1950,7 +1950,7 @@ int RM_DeleteKey(RedisModuleKey *key) { return REDISMODULE_OK; } -/* If the key is open for writing, unlink it (that is delete it in a +/* If the key is open for writing, unlink it (that is delete it in a * non-blocking way, not reclaiming memory immediately) and setup the key to * accept new writes as an empty key (that will be created on demand). * On success REDISMODULE_OK is returned. If the key is not open for @@ -6768,6 +6768,32 @@ int RM_GetLRUOrLFU(RedisModuleKey *key, long long *lfu_freq, long long *lru_idle return REDISMODULE_OK; } +/* Replace the value assigned to a module type. + * + * The key must be open for writing, have an existing value, and have a moduleType + * that matches the one specified by the caller. + * + * Unlike RM_ModuleTypeSetValue() which will free the old value, this function + * simply swaps the old value with the new value. + * + * The function returns the old value, or NULL if any of the above conditions is + * not met. + */ +void *RM_ModuleTypeReplaceValue(RedisModuleKey *key, moduleType *mt, void *new_value) { + if (!(key->mode & REDISMODULE_WRITE) || key->iter) + return NULL; + if (!key->value || key->value->type != OBJ_MODULE) + return NULL; + + moduleValue *mv = key->value->ptr; + if (mv->type != mt) + return NULL; + + void *old_val = mv->value; + mv->value = new_value; + return old_val; +} + /* Register all the APIs we export. Keep this function at the end of the * file so that's easy to seek it to add new entries. */ void moduleRegisterCoreAPI(void) { @@ -6857,6 +6883,7 @@ void moduleRegisterCoreAPI(void) { REGISTER_API(PoolAlloc); REGISTER_API(CreateDataType); REGISTER_API(ModuleTypeSetValue); + REGISTER_API(ModuleTypeReplaceValue); REGISTER_API(ModuleTypeGetType); REGISTER_API(ModuleTypeGetValue); REGISTER_API(IsIOError); diff --git a/src/redismodule.h b/src/redismodule.h index 728c7f584..8cf789fb0 100644 --- a/src/redismodule.h +++ b/src/redismodule.h @@ -517,6 +517,7 @@ int REDISMODULE_API_FUNC(RedisModule_GetContextFlags)(RedisModuleCtx *ctx); void *REDISMODULE_API_FUNC(RedisModule_PoolAlloc)(RedisModuleCtx *ctx, size_t bytes); RedisModuleType *REDISMODULE_API_FUNC(RedisModule_CreateDataType)(RedisModuleCtx *ctx, const char *name, int encver, RedisModuleTypeMethods *typemethods); int REDISMODULE_API_FUNC(RedisModule_ModuleTypeSetValue)(RedisModuleKey *key, RedisModuleType *mt, void *value); +void *REDISMODULE_API_FUNC(RedisModule_ModuleTypeReplaceValue)(RedisModuleKey *key, RedisModuleType *mt, void *new_value); RedisModuleType *REDISMODULE_API_FUNC(RedisModule_ModuleTypeGetType)(RedisModuleKey *key); void *REDISMODULE_API_FUNC(RedisModule_ModuleTypeGetValue)(RedisModuleKey *key); int REDISMODULE_API_FUNC(RedisModule_IsIOError)(RedisModuleIO *io); @@ -726,6 +727,7 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int REDISMODULE_GET_API(PoolAlloc); REDISMODULE_GET_API(CreateDataType); REDISMODULE_GET_API(ModuleTypeSetValue); + REDISMODULE_GET_API(ModuleTypeReplaceValue); REDISMODULE_GET_API(ModuleTypeGetType); REDISMODULE_GET_API(ModuleTypeGetValue); REDISMODULE_GET_API(IsIOError); From 2d30afc45fac6d0dcb0e5e12ebdf197d4c77649a Mon Sep 17 00:00:00 2001 From: Oran Agra Date: Thu, 14 Nov 2019 09:09:10 +0200 Subject: [PATCH 17/34] module docs, missing LOADING flag --- src/module.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/module.c b/src/module.c index ad34e7b64..ceb18a557 100644 --- a/src/module.c +++ b/src/module.c @@ -1749,6 +1749,8 @@ int RM_GetSelectedDb(RedisModuleCtx *ctx) { * * REDISMODULE_CTX_FLAGS_OOM_WARNING: Less than 25% of memory remains before * reaching the maxmemory level. * + * * REDISMODULE_CTX_FLAGS_LOADING: Server is loading RDB/AOF + * * * REDISMODULE_CTX_FLAGS_REPLICA_IS_STALE: No active link with the master. * * * REDISMODULE_CTX_FLAGS_REPLICA_IS_CONNECTING: The replica is trying to From c426bbf3a54939775fceac1a318f2fa22778ee08 Mon Sep 17 00:00:00 2001 From: Oran Agra Date: Thu, 14 Nov 2019 09:46:46 +0200 Subject: [PATCH 18/34] Slightly more efficient RM_ReplyWithEmptyString trimming talk about RESP protocol from API docs (should be independent to that anyway) --- src/module.c | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/module.c b/src/module.c index ad34e7b64..6a6db3fe8 100644 --- a/src/module.c +++ b/src/module.c @@ -1389,7 +1389,7 @@ int RM_ReplyWithString(RedisModuleCtx *ctx, RedisModuleString *str) { int RM_ReplyWithEmptyString(RedisModuleCtx *ctx) { client *c = moduleGetReplyClient(ctx); if (c == NULL) return REDISMODULE_OK; - addReplyBulkCBuffer(c, "", 0); + addReply(c,shared.emptybulk); return REDISMODULE_OK; } @@ -1404,8 +1404,7 @@ int RM_ReplyWithVerbatimString(RedisModuleCtx *ctx, const char *buf, size_t len) return REDISMODULE_OK; } -/* Reply to the client with a NULL. In the RESP protocol a NULL is encoded - * as the string "$-1\r\n". +/* Reply to the client with a NULL. * * The function always returns REDISMODULE_OK. */ int RM_ReplyWithNull(RedisModuleCtx *ctx) { From 8d50a8327e9f77f70ac7c11edd41a74a193ed830 Mon Sep 17 00:00:00 2001 From: antirez Date: Thu, 14 Nov 2019 12:48:54 +0100 Subject: [PATCH 19/34] Rax library updated. --- src/rax.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/rax.c b/src/rax.c index be474b058..29b74ae90 100644 --- a/src/rax.c +++ b/src/rax.c @@ -1673,6 +1673,7 @@ int raxSeek(raxIterator *it, const char *op, unsigned char *ele, size_t len) { * node, but will be our match, representing the key "f". * * So in that case, we don't seek backward. */ + it->data = raxGetData(it->node); } else { if (gt && !raxIteratorNextStep(it,0)) return 0; if (lt && !raxIteratorPrevStep(it,0)) return 0; @@ -1791,7 +1792,7 @@ int raxCompare(raxIterator *iter, const char *op, unsigned char *key, size_t key if (eq && key_len == iter->key_len) return 1; else if (lt) return iter->key_len < key_len; else if (gt) return iter->key_len > key_len; - return 0; + else return 0; /* Avoid warning, just 'eq' is handled before. */ } else if (cmp > 0) { return gt ? 1 : 0; } else /* (cmp < 0) */ { From 2f6fe5ce3adb84cfc7506577c49504a8a65aaf2b Mon Sep 17 00:00:00 2001 From: antirez Date: Thu, 14 Nov 2019 18:27:37 +0100 Subject: [PATCH 20/34] Expire cycle: introduce the new state needed for the new algo. --- src/db.c | 2 ++ src/expire.c | 5 +++++ src/server.c | 2 ++ src/server.h | 5 ++--- 4 files changed, 11 insertions(+), 3 deletions(-) diff --git a/src/db.c b/src/db.c index 8a7ea98ae..e47a4a681 100644 --- a/src/db.c +++ b/src/db.c @@ -1077,10 +1077,12 @@ int dbSwapDatabases(long id1, long id2) { db1->dict = db2->dict; db1->expires = db2->expires; db1->avg_ttl = db2->avg_ttl; + db1->expires_cursor = db2->expires_cursor; db2->dict = aux.dict; db2->expires = aux.expires; db2->avg_ttl = aux.avg_ttl; + db2->expires_cursor = aux.expires_cursor; /* Now we need to handle clients blocked on lists: as an effect * of swapping the two DBs, a client that was waiting for list diff --git a/src/expire.c b/src/expire.c index 598b27f96..3d0bae249 100644 --- a/src/expire.c +++ b/src/expire.c @@ -95,6 +95,10 @@ int activeExpireCycleTryExpire(redisDb *db, dictEntry *de, long long now) { * executed, where the time limit is a percentage of the REDIS_HZ period * as specified by the ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC define. */ +#define ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP 20 /* Loopkups per loop. */ +#define ACTIVE_EXPIRE_CYCLE_FAST_DURATION 1000 /* Microseconds. */ +#define ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC 25 /* Percentage of CPU to use. */ + void activeExpireCycle(int type) { /* This function has some global state in order to continue the work * incrementally across calls. */ @@ -231,6 +235,7 @@ void activeExpireCycle(int type) { } elapsed = ustime()-start; + server.stat_expire_cycle_time_used += elapsed; latencyAddSampleIfNeeded("expire-cycle",elapsed/1000); /* Update our estimate of keys existing but yet to be expired. diff --git a/src/server.c b/src/server.c index 113d92cbb..8c8b668de 100644 --- a/src/server.c +++ b/src/server.c @@ -2745,6 +2745,7 @@ void resetServerStats(void) { server.stat_expiredkeys = 0; server.stat_expired_stale_perc = 0; server.stat_expired_time_cap_reached_count = 0; + server.stat_expire_cycle_time_used = 0; server.stat_evictedkeys = 0; server.stat_keyspace_misses = 0; server.stat_keyspace_hits = 0; @@ -2848,6 +2849,7 @@ void initServer(void) { for (j = 0; j < server.dbnum; j++) { server.db[j].dict = dictCreate(&dbDictType,NULL); server.db[j].expires = dictCreate(&keyptrDictType,NULL); + server.db[j].expires_cursor = 0; server.db[j].blocking_keys = dictCreate(&keylistDictType,NULL); server.db[j].ready_keys = dictCreate(&objectKeyPointerValueDictType,NULL); server.db[j].watched_keys = dictCreate(&keylistDictType,NULL); diff --git a/src/server.h b/src/server.h index a2c94a22a..2b7974594 100644 --- a/src/server.h +++ b/src/server.h @@ -180,9 +180,6 @@ typedef long long ustime_t; /* microsecond time type. */ #define CONFIG_DEFAULT_PROTO_MAX_BULK_LEN (512ll*1024*1024) /* Bulk request max size */ #define CONFIG_DEFAULT_TRACKING_TABLE_MAX_FILL 10 /* 10% tracking table max fill. */ -#define ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP 20 /* Loopkups per loop. */ -#define ACTIVE_EXPIRE_CYCLE_FAST_DURATION 1000 /* Microseconds */ -#define ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC 25 /* CPU max % for keys collection */ #define ACTIVE_EXPIRE_CYCLE_SLOW 0 #define ACTIVE_EXPIRE_CYCLE_FAST 1 @@ -721,6 +718,7 @@ typedef struct redisDb { dict *watched_keys; /* WATCHED keys for MULTI/EXEC CAS */ int id; /* Database ID */ long long avg_ttl; /* Average TTL, just for stats */ + unsigned long expires_cursor; /* Cursor of the active expire cycle. */ list *defrag_later; /* List of key names to attempt to defrag one by one, gradually. */ } redisDb; @@ -1167,6 +1165,7 @@ struct redisServer { long long stat_expiredkeys; /* Number of expired keys */ double stat_expired_stale_perc; /* Percentage of keys probably expired */ long long stat_expired_time_cap_reached_count; /* Early expire cylce stops.*/ + long long stat_expire_cycle_time_used; /* Cumulative microseconds used. */ long long stat_evictedkeys; /* Number of evicted keys (maxmemory) */ long long stat_keyspace_hits; /* Number of successful lookups of keys */ long long stat_keyspace_misses; /* Number of failed lookups of keys */ From ffc7e509aa93c2441f8d40ceab62cd299a18f275 Mon Sep 17 00:00:00 2001 From: antirez Date: Fri, 15 Nov 2019 10:38:55 +0100 Subject: [PATCH 21/34] Expire cycle: scan hash table buckets directly. --- src/expire.c | 93 +++++++++++++++++++++++++++++++++++++--------------- 1 file changed, 66 insertions(+), 27 deletions(-) diff --git a/src/expire.c b/src/expire.c index 3d0bae249..af9eaebf1 100644 --- a/src/expire.c +++ b/src/expire.c @@ -78,24 +78,40 @@ int activeExpireCycleTryExpire(redisDb *db, dictEntry *de, long long now) { * it will get more aggressive to avoid that too much memory is used by * keys that can be removed from the keyspace. * - * No more than CRON_DBS_PER_CALL databases are tested at every - * iteration. + * Every expire cycle tests multiple databases: the next call will start + * again from the next db, with the exception of exists for time limit: in that + * case we restart again from the last database we were processing. Anyway + * no more than CRON_DBS_PER_CALL databases are tested at every iteration. * - * This kind of call is used when Redis detects that timelimit_exit is - * true, so there is more work to do, and we do it more incrementally from - * the beforeSleep() function of the event loop. + * The function can perform more or less work, depending on the "type" + * argument. It can execute a "fast cycle" or a "slow cycle". The slow + * cycle is the main way we collect expired cycles: this happens with + * the "server.hz" frequency (usually 10 hertz). * - * Expire cycle type: + * However the slow cycle can exit for timeout, since it used too much time. + * For this reason the function is also invoked to perform a fast cycle + * at every event loop cycle, in the beforeSleep() function. The fast cycle + * will try to perform less work, but will do it much more often. + * + * The following are the details of the two expire cycles and their stop + * conditions: * * If type is ACTIVE_EXPIRE_CYCLE_FAST the function will try to run a * "fast" expire cycle that takes no longer than EXPIRE_FAST_CYCLE_DURATION * microseconds, and is not repeated again before the same amount of time. + * The cycle will also refuse to run at all if the latest slow cycle did not + * terminate because of a time limit condition. * * If type is ACTIVE_EXPIRE_CYCLE_SLOW, that normal expire cycle is * executed, where the time limit is a percentage of the REDIS_HZ period - * as specified by the ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC define. */ + * as specified by the ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC define. In the + * fast cycle, the check of every database is interrupted once the number + * of already expired keys in the database is estimated to be lower than + * a given percentage, in order to avoid doing too much work to gain too + * little memory. + */ -#define ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP 20 /* Loopkups per loop. */ +#define ACTIVE_EXPIRE_CYCLE_BUCKETS_PER_LOOP 20 /* HT buckets checked. */ #define ACTIVE_EXPIRE_CYCLE_FAST_DURATION 1000 /* Microseconds. */ #define ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC 25 /* Percentage of CPU to use. */ @@ -152,7 +168,9 @@ void activeExpireCycle(int type) { long total_expired = 0; for (j = 0; j < dbs_per_call && timelimit_exit == 0; j++) { - int expired; + /* Expired and checked in a single loop. */ + unsigned long expired, sampled; + redisDb *db = server.db+(current_db % server.dbnum); /* Increment the DB now so we are sure if we run out of time @@ -176,8 +194,8 @@ void activeExpireCycle(int type) { slots = dictSlots(db->expires); now = mstime(); - /* When there are less than 1% filled slots getting random - * keys is expensive, so stop here waiting for better times... + /* When there are less than 1% filled slots, sampling the key + * space is expensive, so stop here waiting for better times... * The dictionary will be resized asap. */ if (num && slots > DICT_HT_INITIAL_SIZE && (num*100/slots < 1)) break; @@ -185,27 +203,47 @@ void activeExpireCycle(int type) { /* The main collection cycle. Sample random keys among keys * with an expire set, checking for expired ones. */ expired = 0; + sampled = 0; ttl_sum = 0; ttl_samples = 0; - if (num > ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP) - num = ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP; + if (num > ACTIVE_EXPIRE_CYCLE_BUCKETS_PER_LOOP) + num = ACTIVE_EXPIRE_CYCLE_BUCKETS_PER_LOOP; - while (num--) { - dictEntry *de; - long long ttl; + /* Here we access the low level representation of the hash table + * for speed concerns: this makes this code coupled with dict.c, + * but it hardly changed in ten years. */ + while (sampled < num) { + for (int table = 0; table < 2; table++) { + if (table == 1 && !dictIsRehashing(db->expires)) break; - if ((de = dictGetRandomKey(db->expires)) == NULL) break; - ttl = dictGetSignedIntegerVal(de)-now; - if (activeExpireCycleTryExpire(db,de,now)) expired++; - if (ttl > 0) { - /* We want the average TTL of keys yet not expired. */ - ttl_sum += ttl; - ttl_samples++; + unsigned long idx = db->expires_cursor; + idx &= db->expires->ht[table].sizemask; + dictEntry *de = db->expires->ht[table].table[idx]; + long long ttl; + + /* Scan the current bucket of the current table. */ + while(de) { + /* Get the next entry now since this entry may get + * deleted. */ + dictEntry *e = de; + de = de->next; + + ttl = dictGetSignedIntegerVal(e)-now; + if (activeExpireCycleTryExpire(db,e,now)) expired++; + if (ttl > 0) { + /* We want the average TTL of keys yet + * not expired. */ + ttl_sum += ttl; + ttl_samples++; + } + sampled++; + } } - total_sampled++; + db->expires_cursor++; } total_expired += expired; + total_sampled += sampled; /* Update the average TTL stats for this database. */ if (ttl_samples) { @@ -229,9 +267,10 @@ void activeExpireCycle(int type) { break; } } - /* We don't repeat the cycle if there are less than 25% of keys - * found expired in the current DB. */ - } while (expired > ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP/4); + /* We don't repeat the cycle for the current database if there are + * less than 25% of keys found expired in the current DB. */ + // printf("[%d] Expired %d, sampled %d\n", type, (int) expired, (int) sampled); + } while (expired > sampled/4); } elapsed = ustime()-start; From 27668056808192a3192bc6a79a2bec862a66c94e Mon Sep 17 00:00:00 2001 From: antirez Date: Fri, 15 Nov 2019 11:08:03 +0100 Subject: [PATCH 22/34] Expire cycle: tollerate less stale keys, expire cycle CPU in INFO. --- src/expire.c | 31 +++++++++++++++++++------------ src/server.c | 2 ++ 2 files changed, 21 insertions(+), 12 deletions(-) diff --git a/src/expire.c b/src/expire.c index af9eaebf1..9cb4c4a0a 100644 --- a/src/expire.c +++ b/src/expire.c @@ -111,9 +111,11 @@ int activeExpireCycleTryExpire(redisDb *db, dictEntry *de, long long now) { * little memory. */ -#define ACTIVE_EXPIRE_CYCLE_BUCKETS_PER_LOOP 20 /* HT buckets checked. */ +#define ACTIVE_EXPIRE_CYCLE_KEYS_PER_LOOP 20 /* Keys for each DB loop. */ #define ACTIVE_EXPIRE_CYCLE_FAST_DURATION 1000 /* Microseconds. */ -#define ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC 25 /* Percentage of CPU to use. */ +#define ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC 25 /* Max % of CPU to use. */ +#define ACTIVE_EXPIRE_CYCLE_ACCEPTABLE_STALE 10 /* % of stale keys after which + we do extra efforts. */ void activeExpireCycle(int type) { /* This function has some global state in order to continue the work @@ -133,10 +135,15 @@ void activeExpireCycle(int type) { if (type == ACTIVE_EXPIRE_CYCLE_FAST) { /* Don't start a fast cycle if the previous cycle did not exit - * for time limit. Also don't repeat a fast cycle for the same period + * for time limit, unless the percentage of estimated stale keys is + * too high. Also never repeat a fast cycle for the same period * as the fast cycle total duration itself. */ - if (!timelimit_exit) return; - if (start < last_fast_cycle + ACTIVE_EXPIRE_CYCLE_FAST_DURATION*2) return; + if (!timelimit_exit && server.stat_expired_stale_perc < + ACTIVE_EXPIRE_CYCLE_ACCEPTABLE_STALE) return; + + if (start < last_fast_cycle + ACTIVE_EXPIRE_CYCLE_FAST_DURATION*2) + return; + last_fast_cycle = start; } @@ -150,8 +157,8 @@ void activeExpireCycle(int type) { if (dbs_per_call > server.dbnum || timelimit_exit) dbs_per_call = server.dbnum; - /* We can use at max ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC percentage of CPU time - * per iteration. Since this function gets called with a frequency of + /* We can use at max ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC percentage of CPU + * time per iteration. Since this function gets called with a frequency of * server.hz times per second, the following is the max amount of * microseconds we can spend in this function. */ timelimit = 1000000*ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC/server.hz/100; @@ -207,8 +214,8 @@ void activeExpireCycle(int type) { ttl_sum = 0; ttl_samples = 0; - if (num > ACTIVE_EXPIRE_CYCLE_BUCKETS_PER_LOOP) - num = ACTIVE_EXPIRE_CYCLE_BUCKETS_PER_LOOP; + if (num > ACTIVE_EXPIRE_CYCLE_KEYS_PER_LOOP) + num = ACTIVE_EXPIRE_CYCLE_KEYS_PER_LOOP; /* Here we access the low level representation of the hash table * for speed concerns: this makes this code coupled with dict.c, @@ -268,9 +275,9 @@ void activeExpireCycle(int type) { } } /* We don't repeat the cycle for the current database if there are - * less than 25% of keys found expired in the current DB. */ - // printf("[%d] Expired %d, sampled %d\n", type, (int) expired, (int) sampled); - } while (expired > sampled/4); + * an acceptable amount of stale keys (logically expired but yet + * not reclained). */ + } while ((expired*100/sampled) > ACTIVE_EXPIRE_CYCLE_ACCEPTABLE_STALE); } elapsed = ustime()-start; diff --git a/src/server.c b/src/server.c index 8c8b668de..f834119ee 100644 --- a/src/server.c +++ b/src/server.c @@ -4270,6 +4270,7 @@ sds genRedisInfoString(char *section) { "expired_keys:%lld\r\n" "expired_stale_perc:%.2f\r\n" "expired_time_cap_reached_count:%lld\r\n" + "expire_cycle_cpu_milliseconds:%lld\r\n" "evicted_keys:%lld\r\n" "keyspace_hits:%lld\r\n" "keyspace_misses:%lld\r\n" @@ -4297,6 +4298,7 @@ sds genRedisInfoString(char *section) { server.stat_expiredkeys, server.stat_expired_stale_perc*100, server.stat_expired_time_cap_reached_count, + server.stat_expire_cycle_time_used/1000, server.stat_evictedkeys, server.stat_keyspace_hits, server.stat_keyspace_misses, From 84b01f63dbe28d5541e09313d35deacf4344ab16 Mon Sep 17 00:00:00 2001 From: antirez Date: Mon, 18 Nov 2019 11:30:05 +0100 Subject: [PATCH 23/34] Expire cycle: introduce configurable effort. --- src/expire.c | 37 ++++++++++++++++++++++++++++--------- src/server.c | 1 + src/server.h | 2 ++ 3 files changed, 31 insertions(+), 9 deletions(-) diff --git a/src/expire.c b/src/expire.c index 9cb4c4a0a..dd8a42726 100644 --- a/src/expire.c +++ b/src/expire.c @@ -109,6 +109,9 @@ int activeExpireCycleTryExpire(redisDb *db, dictEntry *de, long long now) { * of already expired keys in the database is estimated to be lower than * a given percentage, in order to avoid doing too much work to gain too * little memory. + * + * The configured expire "effort" will modify the baseline parameters in + * order to do more work in both the fast and slow expire cycles. */ #define ACTIVE_EXPIRE_CYCLE_KEYS_PER_LOOP 20 /* Keys for each DB loop. */ @@ -118,6 +121,21 @@ int activeExpireCycleTryExpire(redisDb *db, dictEntry *de, long long now) { we do extra efforts. */ void activeExpireCycle(int type) { + /* Adjust the running parameters according to the configured expire + * effort. The default effort is 1, and the maximum configurable effort + * is 10. */ + unsigned long + effort = server.active_expire_effort, + config_keys_per_loop = ACTIVE_EXPIRE_CYCLE_KEYS_PER_LOOP + + ACTIVE_EXPIRE_CYCLE_KEYS_PER_LOOP/4*effort, + config_cycle_fast_duration = ACTIVE_EXPIRE_CYCLE_FAST_DURATION * + ACTIVE_EXPIRE_CYCLE_FAST_DURATION/4*effort, + config_cycle_slow_time_perc = ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC + + 2*effort, + config_cycle_acceptable_stale = ACTIVE_EXPIRE_CYCLE_ACCEPTABLE_STALE- + effort; + if (config_cycle_acceptable_stale < 1) config_cycle_acceptable_stale = 1; + /* This function has some global state in order to continue the work * incrementally across calls. */ static unsigned int current_db = 0; /* Last DB tested. */ @@ -138,10 +156,11 @@ void activeExpireCycle(int type) { * for time limit, unless the percentage of estimated stale keys is * too high. Also never repeat a fast cycle for the same period * as the fast cycle total duration itself. */ - if (!timelimit_exit && server.stat_expired_stale_perc < - ACTIVE_EXPIRE_CYCLE_ACCEPTABLE_STALE) return; + if (!timelimit_exit && + server.stat_expired_stale_perc < config_cycle_acceptable_stale) + return; - if (start < last_fast_cycle + ACTIVE_EXPIRE_CYCLE_FAST_DURATION*2) + if (start < last_fast_cycle + (long long)config_cycle_fast_duration*2) return; last_fast_cycle = start; @@ -157,16 +176,16 @@ void activeExpireCycle(int type) { if (dbs_per_call > server.dbnum || timelimit_exit) dbs_per_call = server.dbnum; - /* We can use at max ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC percentage of CPU + /* We can use at max 'config_cycle_slow_time_perc' percentage of CPU * time per iteration. Since this function gets called with a frequency of * server.hz times per second, the following is the max amount of * microseconds we can spend in this function. */ - timelimit = 1000000*ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC/server.hz/100; + timelimit = config_cycle_slow_time_perc*1000000/server.hz/100; timelimit_exit = 0; if (timelimit <= 0) timelimit = 1; if (type == ACTIVE_EXPIRE_CYCLE_FAST) - timelimit = ACTIVE_EXPIRE_CYCLE_FAST_DURATION; /* in microseconds. */ + timelimit = config_cycle_fast_duration; /* in microseconds. */ /* Accumulate some global stats as we expire keys, to have some idea * about the number of keys that are already logically expired, but still @@ -214,8 +233,8 @@ void activeExpireCycle(int type) { ttl_sum = 0; ttl_samples = 0; - if (num > ACTIVE_EXPIRE_CYCLE_KEYS_PER_LOOP) - num = ACTIVE_EXPIRE_CYCLE_KEYS_PER_LOOP; + if (num > config_keys_per_loop) + num = config_keys_per_loop; /* Here we access the low level representation of the hash table * for speed concerns: this makes this code coupled with dict.c, @@ -277,7 +296,7 @@ void activeExpireCycle(int type) { /* We don't repeat the cycle for the current database if there are * an acceptable amount of stale keys (logically expired but yet * not reclained). */ - } while ((expired*100/sampled) > ACTIVE_EXPIRE_CYCLE_ACCEPTABLE_STALE); + } while ((expired*100/sampled) > config_cycle_acceptable_stale); } elapsed = ustime()-start; diff --git a/src/server.c b/src/server.c index f834119ee..c811b869d 100644 --- a/src/server.c +++ b/src/server.c @@ -2294,6 +2294,7 @@ void initServerConfig(void) { server.maxidletime = CONFIG_DEFAULT_CLIENT_TIMEOUT; server.tcpkeepalive = CONFIG_DEFAULT_TCP_KEEPALIVE; server.active_expire_enabled = 1; + server.active_expire_effort = CONFIG_DEFAULT_ACTIVE_EXPIRE_EFFORT; server.jemalloc_bg_thread = 1; server.active_defrag_enabled = CONFIG_DEFAULT_ACTIVE_DEFRAG; server.active_defrag_ignore_bytes = CONFIG_DEFAULT_DEFRAG_IGNORE_BYTES; diff --git a/src/server.h b/src/server.h index 2b7974594..51dce955d 100644 --- a/src/server.h +++ b/src/server.h @@ -179,6 +179,7 @@ typedef long long ustime_t; /* microsecond time type. */ #define CONFIG_DEFAULT_DEFRAG_MAX_SCAN_FIELDS 1000 /* keys with more than 1000 fields will be processed separately */ #define CONFIG_DEFAULT_PROTO_MAX_BULK_LEN (512ll*1024*1024) /* Bulk request max size */ #define CONFIG_DEFAULT_TRACKING_TABLE_MAX_FILL 10 /* 10% tracking table max fill. */ +#define CONFIG_DEFAULT_ACTIVE_EXPIRE_EFFORT 1 /* From 1 to 10. */ #define ACTIVE_EXPIRE_CYCLE_SLOW 0 #define ACTIVE_EXPIRE_CYCLE_FAST 1 @@ -1204,6 +1205,7 @@ struct redisServer { int maxidletime; /* Client timeout in seconds */ int tcpkeepalive; /* Set SO_KEEPALIVE if non-zero. */ int active_expire_enabled; /* Can be disabled for testing purposes. */ + int active_expire_effort; /* From 1 (default) to 10, active effort. */ int active_defrag_enabled; int jemalloc_bg_thread; /* Enable jemalloc background thread */ size_t active_defrag_ignore_bytes; /* minimum amount of fragmentation waste to start active defrag */ From 1ccc270a2cb3f489e85560f1c953b105997c8b1a Mon Sep 17 00:00:00 2001 From: antirez Date: Mon, 18 Nov 2019 11:33:40 +0100 Subject: [PATCH 24/34] Expire cycle: make expire effort configurable. --- src/config.c | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/src/config.c b/src/config.c index 505dabc9c..de8e8d8cf 100644 --- a/src/config.c +++ b/src/config.c @@ -256,7 +256,7 @@ void loadServerConfigFromString(char *config) { for (configYesNo *config = configs_yesno; config->name != NULL; config++) { if ((!strcasecmp(argv[0],config->name) || (config->alias && !strcasecmp(argv[0],config->alias))) && - (argc == 2)) + (argc == 2)) { if ((*(config->config) = yesnotoi(argv[1])) == -1) { err = "argument must be 'yes' or 'no'"; goto loaderr; @@ -580,6 +580,14 @@ void loadServerConfigFromString(char *config) { err = "active-defrag-max-scan-fields must be positive"; goto loaderr; } + } else if (!strcasecmp(argv[0],"active-expire-effort") && argc == 2) { + server.active_expire_effort = atoi(argv[1]); + if (server.active_expire_effort < 1 || + server.active_expire_effort > 10) + { + err = "active-expire-effort must be between 1 and 10"; + goto loaderr; + } } else if (!strcasecmp(argv[0],"hash-max-ziplist-entries") && argc == 2) { server.hash_max_ziplist_entries = memtoll(argv[1], NULL); } else if (!strcasecmp(argv[0],"hash-max-ziplist-value") && argc == 2) { @@ -1165,6 +1173,8 @@ void configSetCommand(client *c) { "active-defrag-cycle-max",server.active_defrag_cycle_max,1,99) { } config_set_numerical_field( "active-defrag-max-scan-fields",server.active_defrag_max_scan_fields,1,LONG_MAX) { + } config_set_numerical_field( + "active-expire-effort",server.active_expire_effort,1,10) { } config_set_numerical_field( "auto-aof-rewrite-percentage",server.aof_rewrite_perc,0,INT_MAX){ } config_set_numerical_field( @@ -1478,6 +1488,7 @@ void configGetCommand(client *c) { config_get_numerical_field("active-defrag-cycle-min",server.active_defrag_cycle_min); config_get_numerical_field("active-defrag-cycle-max",server.active_defrag_cycle_max); config_get_numerical_field("active-defrag-max-scan-fields",server.active_defrag_max_scan_fields); + config_get_numerical_field("active-expire-effort",server.active_expire_effort); config_get_numerical_field("auto-aof-rewrite-percentage", server.aof_rewrite_perc); config_get_numerical_field("auto-aof-rewrite-min-size", @@ -2327,6 +2338,7 @@ int rewriteConfig(char *path) { rewriteConfigNumericalOption(state,"active-defrag-cycle-min",server.active_defrag_cycle_min,CONFIG_DEFAULT_DEFRAG_CYCLE_MIN); rewriteConfigNumericalOption(state,"active-defrag-cycle-max",server.active_defrag_cycle_max,CONFIG_DEFAULT_DEFRAG_CYCLE_MAX); rewriteConfigNumericalOption(state,"active-defrag-max-scan-fields",server.active_defrag_max_scan_fields,CONFIG_DEFAULT_DEFRAG_MAX_SCAN_FIELDS); + rewriteConfigNumericalOption(state,"active-expire-effort",server.active_expire_effort,CONFIG_DEFAULT_ACTIVE_EXPIRE_EFFORT); rewriteConfigYesNoOption(state,"appendonly",server.aof_enabled,0); rewriteConfigStringOption(state,"appendfilename",server.aof_filename,CONFIG_DEFAULT_AOF_FILENAME); rewriteConfigEnumOption(state,"appendfsync",server.aof_fsync,aof_fsync_enum,CONFIG_DEFAULT_AOF_FSYNC); From 2ab51a644d3d390df50dc1bc59958a15affeb341 Mon Sep 17 00:00:00 2001 From: antirez Date: Mon, 18 Nov 2019 11:43:42 +0100 Subject: [PATCH 25/34] Expire cycle: fix parameters computation. --- src/expire.c | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/expire.c b/src/expire.c index dd8a42726..ea7e2b456 100644 --- a/src/expire.c +++ b/src/expire.c @@ -125,16 +125,15 @@ void activeExpireCycle(int type) { * effort. The default effort is 1, and the maximum configurable effort * is 10. */ unsigned long - effort = server.active_expire_effort, + effort = server.active_expire_effort-1, /* Rescale from 0 to 9. */ config_keys_per_loop = ACTIVE_EXPIRE_CYCLE_KEYS_PER_LOOP + ACTIVE_EXPIRE_CYCLE_KEYS_PER_LOOP/4*effort, - config_cycle_fast_duration = ACTIVE_EXPIRE_CYCLE_FAST_DURATION * + config_cycle_fast_duration = ACTIVE_EXPIRE_CYCLE_FAST_DURATION + ACTIVE_EXPIRE_CYCLE_FAST_DURATION/4*effort, config_cycle_slow_time_perc = ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC + 2*effort, config_cycle_acceptable_stale = ACTIVE_EXPIRE_CYCLE_ACCEPTABLE_STALE- effort; - if (config_cycle_acceptable_stale < 1) config_cycle_acceptable_stale = 1; /* This function has some global state in order to continue the work * incrementally across calls. */ From e8ceba4e64d6ae7ce8baef90785b4f758e84f5e7 Mon Sep 17 00:00:00 2001 From: antirez Date: Mon, 18 Nov 2019 17:47:19 +0100 Subject: [PATCH 26/34] Expire cycle: set a buckets limit as well. --- src/expire.c | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/src/expire.c b/src/expire.c index ea7e2b456..b4ab9ab18 100644 --- a/src/expire.c +++ b/src/expire.c @@ -237,8 +237,18 @@ void activeExpireCycle(int type) { /* Here we access the low level representation of the hash table * for speed concerns: this makes this code coupled with dict.c, - * but it hardly changed in ten years. */ - while (sampled < num) { + * but it hardly changed in ten years. + * + * Note that certain places of the hash table may be empty, + * so we want also a stop condition about the number of + * buckets that we scanned. However scanning for free buckets + * is very fast: we are in the cache line scanning a sequential + * array of NULL pointers, so we can scan a lot more buckets + * than keys in the same time. */ + long max_buckets = num*20; + long checked_buckets = 0; + + while (sampled < num && checked_buckets < max_buckets) { for (int table = 0; table < 2; table++) { if (table == 1 && !dictIsRehashing(db->expires)) break; @@ -248,6 +258,7 @@ void activeExpireCycle(int type) { long long ttl; /* Scan the current bucket of the current table. */ + checked_buckets++; while(de) { /* Get the next entry now since this entry may get * deleted. */ From 3243252cb05869ed4abd49f06e45d7eac4912298 Mon Sep 17 00:00:00 2001 From: antirez Date: Mon, 18 Nov 2019 18:11:38 +0100 Subject: [PATCH 27/34] Expire cycle: document expire effort in redis.conf. --- redis.conf | 23 ++++++++++++++++++++--- 1 file changed, 20 insertions(+), 3 deletions(-) diff --git a/redis.conf b/redis.conf index 0ec3321a5..39e21b5e7 100644 --- a/redis.conf +++ b/redis.conf @@ -813,11 +813,11 @@ replica-priority 100 # MAXMEMORY POLICY: how Redis will select what to remove when maxmemory # is reached. You can select among five behaviors: # -# volatile-lru -> Evict using approximated LRU among the keys with an expire set. +# volatile-lru -> Evict using approximated LRU, only keys with an expire set. # allkeys-lru -> Evict any key using approximated LRU. -# volatile-lfu -> Evict using approximated LFU among the keys with an expire set. +# volatile-lfu -> Evict using approximated LFU, only keys with an expire set. # allkeys-lfu -> Evict any key using approximated LFU. -# volatile-random -> Remove a random key among the ones with an expire set. +# volatile-random -> Remove a random key having an expire set. # allkeys-random -> Remove a random key, any key. # volatile-ttl -> Remove the key with the nearest expire time (minor TTL) # noeviction -> Don't evict anything, just return an error on write operations. @@ -872,6 +872,23 @@ replica-priority 100 # # replica-ignore-maxmemory yes +# Redis reclaims expired keys in two ways: upon access when those keys are +# found to be expired, and also in background, in what is called the +# "active expire key". The key space is slowly and interactively scanned +# looking for expired keys to reclaim, so that it is possible to free memory +# of keys that are expired and will never be accessed again in a short time. +# +# The default effort of the expire cycle will try to avoid having more than +# ten percent of expired keys still in memory, and will try to avoid consuming +# more than 25% of total memory and to add latency to the system. However +# it is possible to increase the expire "effort" that is normally set to +# "1", to a greater value, up to the value "10". At its maximum value the +# system will use more CPU, longer cycles (and technically may introduce +# more latency), and will tollerate less already expired keys still present +# in the system. It's a tradeoff betweeen memory, CPU and latecy. +# +# active-expire-effort 1 + ############################# LAZY FREEING #################################### # Redis has two primitives to delete keys. One is called DEL and is a blocking From 7c95e89ec320c2f8ecc767e7dc5e93abfbbd07bf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=96=9C=E6=AC=A2=E5=85=B0=E8=8A=B1=E5=B1=B1=E4=B8=98?= Date: Tue, 19 Nov 2019 17:23:47 +0800 Subject: [PATCH 28/34] Update mkreleasehdr.sh fix date +%s errata --- src/mkreleasehdr.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/mkreleasehdr.sh b/src/mkreleasehdr.sh index e6d558b17..236c26c2b 100755 --- a/src/mkreleasehdr.sh +++ b/src/mkreleasehdr.sh @@ -3,7 +3,7 @@ GIT_SHA1=`(git show-ref --head --hash=8 2> /dev/null || echo 00000000) | head -n GIT_DIRTY=`git diff --no-ext-diff 2> /dev/null | wc -l` BUILD_ID=`uname -n`"-"`date +%s` if [ -n "$SOURCE_DATE_EPOCH" ]; then - BUILD_ID=$(date -u -d "@$SOURCE_DATE_EPOCH" +%s 2>/dev/null || date -u -r "$SOURCE_DATE_EPOCH" +%s 2>/dev/null || date -u %s) + BUILD_ID=$(date -u -d "@$SOURCE_DATE_EPOCH" +%s 2>/dev/null || date -u -r "$SOURCE_DATE_EPOCH" +%s 2>/dev/null || date -u +%s) fi test -f release.h || touch release.h (cat release.h | grep SHA1 | grep $GIT_SHA1) && \ From 5b80a41caddb9c5fa820de08a3c8646cd1b9640d Mon Sep 17 00:00:00 2001 From: antirez Date: Tue, 19 Nov 2019 11:05:55 +0100 Subject: [PATCH 29/34] Remove additional space from comment. --- src/t_stream.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/t_stream.c b/src/t_stream.c index 9bf87831f..0b07d7110 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -1220,7 +1220,7 @@ void xaddCommand(client *c) { return; } - /* Return ASAP if minimal ID (0-0) was given so we avoid possibly creating + /* Return ASAP if minimal ID (0-0) was given so we avoid possibly creating * a new stream and have streamAppendItem fail, leaving an empty key in the * database. */ if (id_given && id.ms == 0 && id.seq == 0) { From 2d1e893b3e133dcceecef2110d2c41aa8f904b87 Mon Sep 17 00:00:00 2001 From: Yossi Gottlieb Date: Tue, 19 Nov 2019 12:10:48 +0200 Subject: [PATCH 30/34] Improve RM_Call() errno classification. RM_Call() will now use EBADF and ENONET in addition to EINVAL in order to provide more information about errors (i.e. when return value is NULL). --- src/module.c | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/module.c b/src/module.c index ad34e7b64..8796a7ab1 100644 --- a/src/module.c +++ b/src/module.c @@ -3110,7 +3110,9 @@ fmterr: * On success a RedisModuleCallReply object is returned, otherwise * NULL is returned and errno is set to the following values: * - * EINVAL: command non existing, wrong arity, wrong format specifier. + * EBADF: wrong format specifier. + * EINVAL: wrong command arity. + * ENOENT: command does not exist. * EPERM: operation in Cluster instance with key in non local slot. * * This API is documented here: https://redis.io/topics/modules-intro @@ -3142,7 +3144,7 @@ RedisModuleCallReply *RM_Call(RedisModuleCtx *ctx, const char *cmdname, const ch /* We handle the above format error only when the client is setup so that * we can free it normally. */ if (argv == NULL) { - errno = EINVAL; + errno = EBADF; goto cleanup; } @@ -3154,7 +3156,7 @@ RedisModuleCallReply *RM_Call(RedisModuleCtx *ctx, const char *cmdname, const ch */ cmd = lookupCommand(c->argv[0]->ptr); if (!cmd) { - errno = EINVAL; + errno = ENOENT; goto cleanup; } c->cmd = c->lastcmd = cmd; From b42466b92586d1adfeec54bda14d4ad54b0764d6 Mon Sep 17 00:00:00 2001 From: antirez Date: Tue, 19 Nov 2019 11:23:43 +0100 Subject: [PATCH 31/34] Fix patch provided in #6554. --- src/blocked.c | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/src/blocked.c b/src/blocked.c index dea4cc57a..47bb290a4 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -514,6 +514,13 @@ void handleClientsBlockedOnKeys(void) { * we can safely call signalKeyAsReady() against this key. */ dictDelete(rl->db->ready_keys,rl->key); + /* Even if we are not inside call(), increment the call depth + * in order to make sure that keys are expired against a fixed + * reference time, and not against the wallclock time. This + * way we can lookup an object multiple times (BRPOPLPUSH does + * that) without the risk of it being freed in the second + * lookup, invalidating the first one. + * See https://github.com/antirez/redis/pull/6554. */ server.call_depth++; updateCachedTime(0); @@ -533,7 +540,7 @@ void handleClientsBlockedOnKeys(void) { serveClientsBlockedOnKeyByModule(rl); } - server.call_depth++; + server.call_depth--; /* Free this item. */ decrRefCount(rl->key); From ce03d6833213901c95e0b5961b555744d3815bd2 Mon Sep 17 00:00:00 2001 From: antirez Date: Tue, 19 Nov 2019 11:28:04 +0100 Subject: [PATCH 32/34] Rename var to fixed_time_expire now that is more general. --- src/blocked.c | 5 ++--- src/db.c | 2 +- src/server.c | 6 +++--- src/server.h | 2 +- 4 files changed, 7 insertions(+), 8 deletions(-) diff --git a/src/blocked.c b/src/blocked.c index 47bb290a4..20c0e760a 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -521,7 +521,7 @@ void handleClientsBlockedOnKeys(void) { * that) without the risk of it being freed in the second * lookup, invalidating the first one. * See https://github.com/antirez/redis/pull/6554. */ - server.call_depth++; + server.fixed_time_expire++; updateCachedTime(0); /* Serve clients blocked on list key. */ @@ -539,8 +539,7 @@ void handleClientsBlockedOnKeys(void) { * module is trying to accomplish right now. */ serveClientsBlockedOnKeyByModule(rl); } - - server.call_depth--; + server.fixed_time_expire--; /* Free this item. */ decrRefCount(rl->key); diff --git a/src/db.c b/src/db.c index e47a4a681..aac049eb7 100644 --- a/src/db.c +++ b/src/db.c @@ -1220,7 +1220,7 @@ int keyIsExpired(redisDb *db, robj *key) { * may re-open the same key multiple times, can invalidate an already * open object in a next call, if the next call will see the key expired, * while the first did not. */ - else if (server.call_depth > 0) { + else if (server.fixed_time_expire > 0) { now = server.mstime; } /* For the other cases, we want to use the most fresh time we have. */ diff --git a/src/server.c b/src/server.c index c811b869d..2e9a329dd 100644 --- a/src/server.c +++ b/src/server.c @@ -2788,7 +2788,7 @@ void initServer(void) { server.hz = server.config_hz; server.pid = getpid(); server.current_client = NULL; - server.call_depth = 0; + server.fixed_time_expire = 0; server.clients = listCreate(); server.clients_index = raxNew(); server.clients_to_close = listCreate(); @@ -3262,7 +3262,7 @@ void call(client *c, int flags) { int client_old_flags = c->flags; struct redisCommand *real_cmd = c->cmd; - server.call_depth++; + server.fixed_time_expire++; /* Sent the command to clients in MONITOR mode, only if the commands are * not generated from reading an AOF. */ @@ -3389,7 +3389,7 @@ void call(client *c, int flags) { trackingRememberKeys(caller); } - server.call_depth--; + server.fixed_time_expire--; server.stat_numcommands++; } diff --git a/src/server.h b/src/server.h index 51dce955d..5da7b4960 100644 --- a/src/server.h +++ b/src/server.h @@ -1134,7 +1134,7 @@ struct redisServer { list *clients_pending_read; /* Client has pending read socket buffers. */ list *slaves, *monitors; /* List of slaves and MONITORs */ client *current_client; /* Current client executing the command. */ - long call_depth; /* call() re-entering count. */ + long fixed_time_expire; /* If > 0, expire keys against server.mstime. */ rax *clients_index; /* Active clients dictionary by client ID. */ int clients_paused; /* True if clients are currently paused */ mstime_t clients_pause_end_time; /* Time when we undo clients_paused */ From 936e01e5bb286d9a8701b2d71d8e90f5b7168475 Mon Sep 17 00:00:00 2001 From: antirez Date: Tue, 19 Nov 2019 11:49:05 +0100 Subject: [PATCH 33/34] Fix stream test after addition of 0-0 ID test. --- tests/unit/type/stream.tcl | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/unit/type/stream.tcl b/tests/unit/type/stream.tcl index a4431c654..656bac5de 100644 --- a/tests/unit/type/stream.tcl +++ b/tests/unit/type/stream.tcl @@ -124,9 +124,9 @@ start_server { } test {XADD with ID 0-0} { - r DEL mystream - catch {r XADD mystream 0-0 k v} err - assert {[r EXISTS mystream] == 0} + r DEL otherstream + catch {r XADD otherstream 0-0 k v} err + assert {[r EXISTS otherstream] == 0} } test {XRANGE COUNT works as expected} { From fe5aea38c35e3fc35a744ad2de73543df553ae48 Mon Sep 17 00:00:00 2001 From: antirez Date: Tue, 19 Nov 2019 11:56:02 +0100 Subject: [PATCH 34/34] Simplify PR #6551 implementation. --- src/sentinel.c | 11 ++--------- 1 file changed, 2 insertions(+), 9 deletions(-) diff --git a/src/sentinel.c b/src/sentinel.c index d5f22b97f..42c4d2467 100644 --- a/src/sentinel.c +++ b/src/sentinel.c @@ -465,12 +465,6 @@ struct redisCommand sentinelcmds[] = { {"hello",helloCommand,-2,"no-script fast",0,NULL,0,0,0,0,0} }; -/* List of client types that are killed when an instance becomes a slave */ -const char* killedClientTypes[] = { - "normal", - "pubsub" -}; - /* This function overwrites a few normal Redis config default with Sentinel * specific defaults. */ void initSentinelConfig(void) { @@ -3955,7 +3949,6 @@ char *sentinelGetLeader(sentinelRedisInstance *master, uint64_t epoch) { int sentinelSendSlaveOf(sentinelRedisInstance *ri, char *host, int port) { char portstr[32]; int retval; - unsigned int curType; ll2string(portstr,sizeof(portstr),port); @@ -4000,11 +3993,11 @@ int sentinelSendSlaveOf(sentinelRedisInstance *ri, char *host, int port) { * an issue because CLIENT is variadic command, so Redis will not * recognized as a syntax error, and the transaction will not fail (but * only the unsupported command will fail). */ - for (curType = 0; curType < sizeof(killedClientTypes)/sizeof(killedClientTypes[0]); ++curType) { + for (int type = 0; type < 2; type++) { retval = redisAsyncCommand(ri->link->cc, sentinelDiscardReplyCallback, ri, "%s KILL TYPE %s", sentinelInstanceMapCommand(ri,"CLIENT"), - killedClientTypes[curType]); + type == 0 ? "normal" : "pubsub"); if (retval == C_ERR) return retval; ri->link->pending_commands++; }