Modules: adding a module type for key space notification (#8759)

Adding a new type mask ​for key space notification, REDISMODULE_NOTIFY_MODULE, to enable unique notifications from commands on REDISMODULE_KEYTYPE_MODULE type keys (which is currently unsupported).

Modules can subscribe to a module key keyspace notification by RM_SubscribeToKeyspaceEvents,
and clients by notify-keyspace-events of redis.conf or via the CONFIG SET, with the characters 'd' or 'A' 
(REDISMODULE_NOTIFY_MODULE type mask is part of the '**A**ll' notation for key space notifications).

Refactor: move some pubsub test infra from pubsub.tcl to util.tcl to be re-used by other tests.
This commit is contained in:
Hanna Fadida 2021-04-19 21:33:26 +03:00 committed by GitHub
parent f40ca9cb58
commit 53a4d6c3b1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 143 additions and 52 deletions

View File

@ -1623,8 +1623,9 @@ latency-monitor-threshold 0
# x Expired events (events generated every time a key expires) # x Expired events (events generated every time a key expires)
# e Evicted events (events generated when a key is evicted for maxmemory) # e Evicted events (events generated when a key is evicted for maxmemory)
# t Stream commands # t Stream commands
# d Module key type events
# m Key-miss events (Note: It is not included in the 'A' class) # m Key-miss events (Note: It is not included in the 'A' class)
# A Alias for g$lshzxet, so that the "AKE" string means all the events # A Alias for g$lshzxetd, so that the "AKE" string means all the events
# (Except key-miss events which are excluded from 'A' due to their # (Except key-miss events which are excluded from 'A' due to their
# unique nature). # unique nature).
# #

View File

@ -5851,6 +5851,7 @@ void moduleReleaseGIL(void) {
* - REDISMODULE_NOTIFY_EXPIRED: Expiration events * - REDISMODULE_NOTIFY_EXPIRED: Expiration events
* - REDISMODULE_NOTIFY_EVICTED: Eviction events * - REDISMODULE_NOTIFY_EVICTED: Eviction events
* - REDISMODULE_NOTIFY_STREAM: Stream events * - REDISMODULE_NOTIFY_STREAM: Stream events
* - REDISMODULE_NOTIFY_MODULE: Module types events
* - REDISMODULE_NOTIFY_KEYMISS: Key-miss events * - REDISMODULE_NOTIFY_KEYMISS: Key-miss events
* - REDISMODULE_NOTIFY_ALL: All events (Excluding REDISMODULE_NOTIFY_KEYMISS) * - REDISMODULE_NOTIFY_ALL: All events (Excluding REDISMODULE_NOTIFY_KEYMISS)
* - REDISMODULE_NOTIFY_LOADED: A special notification available only for modules, * - REDISMODULE_NOTIFY_LOADED: A special notification available only for modules,

View File

@ -56,6 +56,7 @@ int keyspaceEventsStringToFlags(char *classes) {
case 'E': flags |= NOTIFY_KEYEVENT; break; case 'E': flags |= NOTIFY_KEYEVENT; break;
case 't': flags |= NOTIFY_STREAM; break; case 't': flags |= NOTIFY_STREAM; break;
case 'm': flags |= NOTIFY_KEY_MISS; break; case 'm': flags |= NOTIFY_KEY_MISS; break;
case 'd': flags |= NOTIFY_MODULE; break;
default: return -1; default: return -1;
} }
} }
@ -82,6 +83,7 @@ sds keyspaceEventsFlagsToString(int flags) {
if (flags & NOTIFY_EXPIRED) res = sdscatlen(res,"x",1); if (flags & NOTIFY_EXPIRED) res = sdscatlen(res,"x",1);
if (flags & NOTIFY_EVICTED) res = sdscatlen(res,"e",1); if (flags & NOTIFY_EVICTED) res = sdscatlen(res,"e",1);
if (flags & NOTIFY_STREAM) res = sdscatlen(res,"t",1); if (flags & NOTIFY_STREAM) res = sdscatlen(res,"t",1);
if (flags & NOTIFY_MODULE) res = sdscatlen(res,"d",1);
} }
if (flags & NOTIFY_KEYSPACE) res = sdscatlen(res,"K",1); if (flags & NOTIFY_KEYSPACE) res = sdscatlen(res,"K",1);
if (flags & NOTIFY_KEYEVENT) res = sdscatlen(res,"E",1); if (flags & NOTIFY_KEYEVENT) res = sdscatlen(res,"E",1);

View File

@ -160,13 +160,14 @@ This flag should not be used directly by the module.
#define REDISMODULE_NOTIFY_STREAM (1<<10) /* t */ #define REDISMODULE_NOTIFY_STREAM (1<<10) /* t */
#define REDISMODULE_NOTIFY_KEY_MISS (1<<11) /* m (Note: This one is excluded from REDISMODULE_NOTIFY_ALL on purpose) */ #define REDISMODULE_NOTIFY_KEY_MISS (1<<11) /* m (Note: This one is excluded from REDISMODULE_NOTIFY_ALL on purpose) */
#define REDISMODULE_NOTIFY_LOADED (1<<12) /* module only key space notification, indicate a key loaded from rdb */ #define REDISMODULE_NOTIFY_LOADED (1<<12) /* module only key space notification, indicate a key loaded from rdb */
#define REDISMODULE_NOTIFY_MODULE (1<<13) /* d, module key space notification */
/* Next notification flag, must be updated when adding new flags above! /* Next notification flag, must be updated when adding new flags above!
This flag should not be used directly by the module. This flag should not be used directly by the module.
* Use RedisModule_GetKeyspaceNotificationFlagsAll instead. */ * Use RedisModule_GetKeyspaceNotificationFlagsAll instead. */
#define _REDISMODULE_NOTIFY_NEXT (1<<13) #define _REDISMODULE_NOTIFY_NEXT (1<<14)
#define REDISMODULE_NOTIFY_ALL (REDISMODULE_NOTIFY_GENERIC | REDISMODULE_NOTIFY_STRING | REDISMODULE_NOTIFY_LIST | REDISMODULE_NOTIFY_SET | REDISMODULE_NOTIFY_HASH | REDISMODULE_NOTIFY_ZSET | REDISMODULE_NOTIFY_EXPIRED | REDISMODULE_NOTIFY_EVICTED | REDISMODULE_NOTIFY_STREAM) /* A */ #define REDISMODULE_NOTIFY_ALL (REDISMODULE_NOTIFY_GENERIC | REDISMODULE_NOTIFY_STRING | REDISMODULE_NOTIFY_LIST | REDISMODULE_NOTIFY_SET | REDISMODULE_NOTIFY_HASH | REDISMODULE_NOTIFY_ZSET | REDISMODULE_NOTIFY_EXPIRED | REDISMODULE_NOTIFY_EVICTED | REDISMODULE_NOTIFY_STREAM | REDISMODULE_NOTIFY_MODULE) /* A */
/* A special pointer that we can use between the core and the module to signal /* A special pointer that we can use between the core and the module to signal
* field deletion, and that is impossible to be a valid pointer. */ * field deletion, and that is impossible to be a valid pointer. */

View File

@ -482,7 +482,8 @@ typedef enum {
#define NOTIFY_STREAM (1<<10) /* t */ #define NOTIFY_STREAM (1<<10) /* t */
#define NOTIFY_KEY_MISS (1<<11) /* m (Note: This one is excluded from NOTIFY_ALL on purpose) */ #define NOTIFY_KEY_MISS (1<<11) /* m (Note: This one is excluded from NOTIFY_ALL on purpose) */
#define NOTIFY_LOADED (1<<12) /* module only key space notification, indicate a key loaded from rdb */ #define NOTIFY_LOADED (1<<12) /* module only key space notification, indicate a key loaded from rdb */
#define NOTIFY_ALL (NOTIFY_GENERIC | NOTIFY_STRING | NOTIFY_LIST | NOTIFY_SET | NOTIFY_HASH | NOTIFY_ZSET | NOTIFY_EXPIRED | NOTIFY_EVICTED | NOTIFY_STREAM) /* A flag */ #define NOTIFY_MODULE (1<<13) /* d, module key space notification */
#define NOTIFY_ALL (NOTIFY_GENERIC | NOTIFY_STRING | NOTIFY_LIST | NOTIFY_SET | NOTIFY_HASH | NOTIFY_ZSET | NOTIFY_EXPIRED | NOTIFY_EVICTED | NOTIFY_STREAM | NOTIFY_MODULE) /* A flag */
/* Get the first bind addr or NULL */ /* Get the first bind addr or NULL */
#define NET_FIRST_BIND_ADDR (server.bindaddr_count ? server.bindaddr[0] : NULL) #define NET_FIRST_BIND_ADDR (server.bindaddr_count ? server.bindaddr[0] : NULL)

View File

@ -38,6 +38,8 @@
/** strores all the keys on which we got 'loaded' keyspace notification **/ /** strores all the keys on which we got 'loaded' keyspace notification **/
RedisModuleDict *loaded_event_log = NULL; RedisModuleDict *loaded_event_log = NULL;
/** stores all the keys on which we got 'module' keyspace notification **/
RedisModuleDict *module_event_log = NULL;
static int KeySpace_NotificationLoaded(RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key){ static int KeySpace_NotificationLoaded(RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key){
REDISMODULE_NOT_USED(ctx); REDISMODULE_NOT_USED(ctx);
@ -78,6 +80,50 @@ static int KeySpace_NotificationGeneric(RedisModuleCtx *ctx, int type, const cha
return REDISMODULE_OK; return REDISMODULE_OK;
} }
static int KeySpace_NotificationModule(RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key) {
REDISMODULE_NOT_USED(ctx);
REDISMODULE_NOT_USED(type);
REDISMODULE_NOT_USED(event);
const char* keyName = RedisModule_StringPtrLen(key, NULL);
int nokey;
RedisModule_DictGetC(module_event_log, (void*)keyName, strlen(keyName), &nokey);
if(nokey){
RedisModule_DictSetC(module_event_log, (void*)keyName, strlen(keyName), RedisModule_HoldString(ctx, key));
}
return REDISMODULE_OK;
}
static int cmdNotify(RedisModuleCtx *ctx, RedisModuleString **argv, int argc){
if(argc != 2){
return RedisModule_WrongArity(ctx);
}
RedisModule_NotifyKeyspaceEvent(ctx, REDISMODULE_NOTIFY_MODULE, "notify", argv[1]);
RedisModule_ReplyWithNull(ctx);
return REDISMODULE_OK;
}
static int cmdIsModuleKeyNotified(RedisModuleCtx *ctx, RedisModuleString **argv, int argc){
if(argc != 2){
return RedisModule_WrongArity(ctx);
}
const char* key = RedisModule_StringPtrLen(argv[1], NULL);
int nokey;
RedisModuleString* keyStr = RedisModule_DictGetC(module_event_log, (void*)key, strlen(key), &nokey);
RedisModule_ReplyWithArray(ctx, 2);
RedisModule_ReplyWithLongLong(ctx, !nokey);
if(nokey){
RedisModule_ReplyWithNull(ctx);
}else{
RedisModule_ReplyWithString(ctx, keyStr);
}
return REDISMODULE_OK;
}
static int cmdIsKeyLoaded(RedisModuleCtx *ctx, RedisModuleString **argv, int argc){ static int cmdIsKeyLoaded(RedisModuleCtx *ctx, RedisModuleString **argv, int argc){
if(argc != 2){ if(argc != 2){
return RedisModule_WrongArity(ctx); return RedisModule_WrongArity(ctx);
@ -171,6 +217,7 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
} }
loaded_event_log = RedisModule_CreateDict(ctx); loaded_event_log = RedisModule_CreateDict(ctx);
module_event_log = RedisModule_CreateDict(ctx);
int keySpaceAll = RedisModule_GetKeyspaceNotificationFlagsAll(); int keySpaceAll = RedisModule_GetKeyspaceNotificationFlagsAll();
@ -187,6 +234,18 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
return REDISMODULE_ERR; return REDISMODULE_ERR;
} }
if(RedisModule_SubscribeToKeyspaceEvents(ctx, REDISMODULE_NOTIFY_MODULE, KeySpace_NotificationModule) != REDISMODULE_OK){
return REDISMODULE_ERR;
}
if (RedisModule_CreateCommand(ctx,"keyspace.notify", cmdNotify,"",0,0,0) == REDISMODULE_ERR){
return REDISMODULE_ERR;
}
if (RedisModule_CreateCommand(ctx,"keyspace.is_module_key_notified", cmdIsModuleKeyNotified,"",0,0,0) == REDISMODULE_ERR){
return REDISMODULE_ERR;
}
if (RedisModule_CreateCommand(ctx,"keyspace.is_key_loaded", cmdIsKeyLoaded,"",0,0,0) == REDISMODULE_ERR){ if (RedisModule_CreateCommand(ctx,"keyspace.is_key_loaded", cmdIsKeyLoaded,"",0,0,0) == REDISMODULE_ERR){
return REDISMODULE_ERR; return REDISMODULE_ERR;
} }
@ -219,6 +278,16 @@ int RedisModule_OnUnload(RedisModuleCtx *ctx) {
RedisModule_FreeString(ctx, val); RedisModule_FreeString(ctx, val);
} }
RedisModule_FreeDict(ctx, loaded_event_log); RedisModule_FreeDict(ctx, loaded_event_log);
RedisModule_DictIteratorStop(iter);
loaded_event_log = NULL; loaded_event_log = NULL;
iter = RedisModule_DictIteratorStartC(module_event_log, "^", NULL, 0);
while((key = RedisModule_DictNextC(iter, &keyLen, (void**)&val))){
RedisModule_FreeString(ctx, val);
}
RedisModule_FreeDict(ctx, module_event_log);
RedisModule_DictIteratorStop(iter);
module_event_log = NULL;
return REDISMODULE_OK; return REDISMODULE_OK;
} }

View File

@ -713,3 +713,52 @@ proc chi_square_value {res} {
return $x2_value return $x2_value
} }
#subscribe to Pub/Sub channels
proc consume_subscribe_messages {client type channels} {
set numsub -1
set counts {}
for {set i [llength $channels]} {$i > 0} {incr i -1} {
set msg [$client read]
assert_equal $type [lindex $msg 0]
# when receiving subscribe messages the channels names
# are ordered. when receiving unsubscribe messages
# they are unordered
set idx [lsearch -exact $channels [lindex $msg 1]]
if {[string match "*unsubscribe" $type]} {
assert {$idx >= 0}
} else {
assert {$idx == 0}
}
set channels [lreplace $channels $idx $idx]
# aggregate the subscription count to return to the caller
lappend counts [lindex $msg 2]
}
# we should have received messages for channels
assert {[llength $channels] == 0}
return $counts
}
proc subscribe {client channels} {
$client subscribe {*}$channels
consume_subscribe_messages $client subscribe $channels
}
proc unsubscribe {client {channels {}}} {
$client unsubscribe {*}$channels
consume_subscribe_messages $client unsubscribe $channels
}
proc psubscribe {client channels} {
$client psubscribe {*}$channels
consume_subscribe_messages $client psubscribe $channels
}
proc punsubscribe {client {channels {}}} {
$client punsubscribe {*}$channels
consume_subscribe_messages $client punsubscribe $channels
}

View File

@ -67,5 +67,20 @@ tags "modules" {
assert_equal {1} [r get lua] assert_equal {1} [r get lua]
r get x r get x
} {3} } {3}
test {Test module key space event} {
r keyspace.notify x
assert_equal {1 x} [r keyspace.is_module_key_notified x]
}
test "Keyspace notifications: module events test" {
r config set notify-keyspace-events Kd
r del x
set rd1 [redis_deferring_client]
assert_equal {1} [psubscribe $rd1 *]
r keyspace.notify x
assert_equal {pmessage * __keyspace@9__:x notify} [$rd1 read]
$rd1 close
}
} }
} }

View File

@ -1,52 +1,4 @@
start_server {tags {"pubsub network"}} { start_server {tags {"pubsub network"}} {
proc __consume_subscribe_messages {client type channels} {
set numsub -1
set counts {}
for {set i [llength $channels]} {$i > 0} {incr i -1} {
set msg [$client read]
assert_equal $type [lindex $msg 0]
# when receiving subscribe messages the channels names
# are ordered. when receiving unsubscribe messages
# they are unordered
set idx [lsearch -exact $channels [lindex $msg 1]]
if {[string match "*unsubscribe" $type]} {
assert {$idx >= 0}
} else {
assert {$idx == 0}
}
set channels [lreplace $channels $idx $idx]
# aggregate the subscription count to return to the caller
lappend counts [lindex $msg 2]
}
# we should have received messages for channels
assert {[llength $channels] == 0}
return $counts
}
proc subscribe {client channels} {
$client subscribe {*}$channels
__consume_subscribe_messages $client subscribe $channels
}
proc unsubscribe {client {channels {}}} {
$client unsubscribe {*}$channels
__consume_subscribe_messages $client unsubscribe $channels
}
proc psubscribe {client channels} {
$client psubscribe {*}$channels
__consume_subscribe_messages $client psubscribe $channels
}
proc punsubscribe {client {channels {}}} {
$client punsubscribe {*}$channels
__consume_subscribe_messages $client punsubscribe $channels
}
test "Pub/Sub PING" { test "Pub/Sub PING" {
set rd1 [redis_deferring_client] set rd1 [redis_deferring_client]
subscribe $rd1 somechannel subscribe $rd1 somechannel