Before evicted and before expired server events are not executed inside an execution unit. (#12733)
Redis 7.2 (#9406) introduced a new modules event, `RedisModuleEvent_Key`. This new event allows the module to read the key data just before it is removed from the database (either deleted, expired, evicted, or overwritten). When the key is removed from the database, either by active expire or eviction. The new event was not called as part of an execution unit. This can cause an issue if the module registers a post notification job inside the event. This job will not be executed atomically with the expiration/eviction operation and will not replicated inside a Multi/Exec. Moreover, the post notification job will be executed right after the event where it is still not safe to perform any write operation, this will violate the promise that post notification job will be called atomically with the operation that triggered it and **only when it is safe to write**. This PR fixes the issue by wrapping each expiration/eviction of a key with an execution unit. This makes sure the entire operation will run atomically and all the post notification jobs will be executed at the end where it is safe to write. Tests were modified to verify the fix.
This commit is contained in:
parent
6223355cf3
commit
0ffb9d2ea9
@ -682,6 +682,7 @@ int performEvictions(void) {
|
||||
*
|
||||
* AOF and Output buffer memory will be freed eventually so
|
||||
* we only care about memory used by the key space. */
|
||||
enterExecutionUnit(1, 0);
|
||||
delta = (long long) zmalloc_used_memory();
|
||||
latencyStartMonitor(eviction_latency);
|
||||
dbGenericDelete(db,keyobj,server.lazyfree_lazy_eviction,DB_FLAG_KEY_EVICTED);
|
||||
@ -694,6 +695,7 @@ int performEvictions(void) {
|
||||
notifyKeyspaceEvent(NOTIFY_EVICTED, "evicted",
|
||||
keyobj, db->id);
|
||||
propagateDeletion(db,keyobj,server.lazyfree_lazy_eviction);
|
||||
exitExecutionUnit();
|
||||
postExecutionUnitOperations();
|
||||
decrRefCount(keyobj);
|
||||
keys_freed++;
|
||||
|
@ -54,10 +54,12 @@
|
||||
int activeExpireCycleTryExpire(redisDb *db, dictEntry *de, long long now) {
|
||||
long long t = dictGetSignedIntegerVal(de);
|
||||
if (now > t) {
|
||||
enterExecutionUnit(1, 0);
|
||||
sds key = dictGetKey(de);
|
||||
robj *keyobj = createStringObject(key,sdslen(key));
|
||||
deleteExpiredKeyAndPropagate(db,keyobj);
|
||||
decrRefCount(keyobj);
|
||||
exitExecutionUnit();
|
||||
return 1;
|
||||
} else {
|
||||
return 0;
|
||||
|
@ -90,6 +90,10 @@ static int KeySpace_NotificationEvicted(RedisModuleCtx *ctx, int type, const cha
|
||||
return REDISMODULE_OK; /* do not count the evicted key */
|
||||
}
|
||||
|
||||
if (strncmp(key_str, "before_evicted", 14) == 0) {
|
||||
return REDISMODULE_OK; /* do not count the before_evicted key */
|
||||
}
|
||||
|
||||
RedisModuleString *new_key = RedisModule_CreateString(NULL, "evicted", 7);
|
||||
RedisModule_AddPostNotificationJob(ctx, KeySpace_PostNotificationString, new_key, KeySpace_PostNotificationStringFreePD);
|
||||
return REDISMODULE_OK;
|
||||
@ -186,6 +190,55 @@ static int KeySpace_PostNotificationsAsyncSet(RedisModuleCtx *ctx, RedisModuleSt
|
||||
return REDISMODULE_OK;
|
||||
}
|
||||
|
||||
typedef struct KeySpace_EventPostNotificationCtx {
|
||||
RedisModuleString *triggered_on;
|
||||
RedisModuleString *new_key;
|
||||
} KeySpace_EventPostNotificationCtx;
|
||||
|
||||
static void KeySpace_ServerEventPostNotificationFree(void *pd) {
|
||||
KeySpace_EventPostNotificationCtx *pn_ctx = pd;
|
||||
RedisModule_FreeString(NULL, pn_ctx->new_key);
|
||||
RedisModule_FreeString(NULL, pn_ctx->triggered_on);
|
||||
RedisModule_Free(pn_ctx);
|
||||
}
|
||||
|
||||
static void KeySpace_ServerEventPostNotification(RedisModuleCtx *ctx, void *pd) {
|
||||
REDISMODULE_NOT_USED(ctx);
|
||||
KeySpace_EventPostNotificationCtx *pn_ctx = pd;
|
||||
RedisModuleCallReply* rep = RedisModule_Call(ctx, "lpush", "!ss", pn_ctx->new_key, pn_ctx->triggered_on);
|
||||
RedisModule_FreeCallReply(rep);
|
||||
}
|
||||
|
||||
static void KeySpace_ServerEventCallback(RedisModuleCtx *ctx, RedisModuleEvent eid, uint64_t subevent, void *data) {
|
||||
REDISMODULE_NOT_USED(eid);
|
||||
REDISMODULE_NOT_USED(data);
|
||||
if (subevent > 3) {
|
||||
RedisModule_Log(ctx, "warning", "Got an unexpected subevent '%ld'", subevent);
|
||||
return;
|
||||
}
|
||||
static const char* events[] = {
|
||||
"before_deleted",
|
||||
"before_expired",
|
||||
"before_evicted",
|
||||
"before_overwritten",
|
||||
};
|
||||
|
||||
const RedisModuleString *key_name = RedisModule_GetKeyNameFromModuleKey(((RedisModuleKeyInfo*)data)->key);
|
||||
const char *key_str = RedisModule_StringPtrLen(key_name, NULL);
|
||||
|
||||
for (int i = 0 ; i < 4 ; ++i) {
|
||||
const char *event = events[i];
|
||||
if (strncmp(key_str, event , strlen(event)) == 0) {
|
||||
return; /* don't log any event on our tracking keys */
|
||||
}
|
||||
}
|
||||
|
||||
KeySpace_EventPostNotificationCtx *pn_ctx = RedisModule_Alloc(sizeof(*pn_ctx));
|
||||
pn_ctx->triggered_on = RedisModule_HoldString(NULL, (RedisModuleString*)key_name);
|
||||
pn_ctx->new_key = RedisModule_CreateString(NULL, events[subevent], strlen(events[subevent]));
|
||||
RedisModule_AddPostNotificationJob(ctx, KeySpace_ServerEventPostNotification, pn_ctx, KeySpace_ServerEventPostNotificationFree);
|
||||
}
|
||||
|
||||
/* This function must be present on each Redis module. It is used in order to
|
||||
* register the commands into the Redis server. */
|
||||
int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
|
||||
@ -200,6 +253,14 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
|
||||
return REDISMODULE_ERR;
|
||||
}
|
||||
|
||||
int with_key_events = 0;
|
||||
if (argc >= 1) {
|
||||
const char *arg = RedisModule_StringPtrLen(argv[0], 0);
|
||||
if (strcmp(arg, "with_key_events") == 0) {
|
||||
with_key_events = 1;
|
||||
}
|
||||
}
|
||||
|
||||
RedisModule_SetModuleOptions(ctx, REDISMODULE_OPTIONS_ALLOW_NESTED_KEYSPACE_NOTIFICATIONS);
|
||||
|
||||
if(RedisModule_SubscribeToKeyspaceEvents(ctx, REDISMODULE_NOTIFY_STRING, KeySpace_NotificationString) != REDISMODULE_OK){
|
||||
@ -222,6 +283,12 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
|
||||
return REDISMODULE_ERR;
|
||||
}
|
||||
|
||||
if (with_key_events) {
|
||||
if(RedisModule_SubscribeToServerEvent(ctx, RedisModuleEvent_Key, KeySpace_ServerEventCallback) != REDISMODULE_OK){
|
||||
return REDISMODULE_ERR;
|
||||
}
|
||||
}
|
||||
|
||||
if (RedisModule_CreateCommand(ctx, "postnotification.async_set", KeySpace_PostNotificationsAsyncSet,
|
||||
"write", 0, 0, 0) == REDISMODULE_ERR){
|
||||
return REDISMODULE_ERR;
|
||||
|
@ -314,6 +314,14 @@ start_server {tags {"modules"}} {
|
||||
r lpush l a
|
||||
assert_equal [$rd read] {OK}
|
||||
|
||||
# Explanation of the first multi exec block:
|
||||
# {lpop l} - pop the value by our blocking command 'blpop_and_set_multiple_keys'
|
||||
# {set string_foo 1} - the action of our blocking command 'blpop_and_set_multiple_keys'
|
||||
# {set string_bar 2} - the action of our blocking command 'blpop_and_set_multiple_keys'
|
||||
# {incr string_changed{string_foo}} - post notification job that was registered when 'string_foo' changed
|
||||
# {incr string_changed{string_bar}} - post notification job that was registered when 'string_bar' changed
|
||||
# {incr string_total} - post notification job that was registered when string_changed{string_foo} changed
|
||||
# {incr string_total} - post notification job that was registered when string_changed{string_bar} changed
|
||||
assert_replication_stream $repl {
|
||||
{select *}
|
||||
{lpush l a}
|
||||
@ -355,6 +363,25 @@ start_server {tags {"modules"}} {
|
||||
r lpush l a
|
||||
assert_equal [$rd read] {OK}
|
||||
|
||||
# Explanation of the first multi exec block:
|
||||
# {lpop l} - pop the value by our blocking command 'blpop_and_set_multiple_keys'
|
||||
# {set string_foo 1} - the action of our blocking command 'blpop_and_set_multiple_keys'
|
||||
# {set string_bar 2} - the action of our blocking command 'blpop_and_set_multiple_keys'
|
||||
# {incr string_changed{string_foo}} - post notification job that was registered when 'string_foo' changed
|
||||
# {incr string_changed{string_bar}} - post notification job that was registered when 'string_bar' changed
|
||||
# {incr string_total} - post notification job that was registered when string_changed{string_foo} changed
|
||||
# {incr string_total} - post notification job that was registered when string_changed{string_bar} changed
|
||||
#
|
||||
# Explanation of the second multi exec block:
|
||||
# {lpop l} - pop the value by our blocking command 'blpop_and_set_multiple_keys'
|
||||
# {del string_foo} - lazy expiration of string_foo when 'blpop_and_set_multiple_keys' tries to write to it.
|
||||
# {set string_foo 1} - the action of our blocking command 'blpop_and_set_multiple_keys'
|
||||
# {set string_bar 2} - the action of our blocking command 'blpop_and_set_multiple_keys'
|
||||
# {incr expired} - the post notification job, registered after string_foo got expired
|
||||
# {incr string_changed{string_foo}} - post notification job triggered when we set string_foo
|
||||
# {incr string_changed{string_bar}} - post notification job triggered when we set string_bar
|
||||
# {incr string_total} - post notification job triggered when we incr 'string_changed{string_foo}'
|
||||
# {incr string_total} - post notification job triggered when we incr 'string_changed{string_bar}'
|
||||
assert_replication_stream $repl {
|
||||
{select *}
|
||||
{lpush l a}
|
||||
|
@ -1,7 +1,8 @@
|
||||
set testmodule [file normalize tests/modules/postnotifications.so]
|
||||
|
||||
tags "modules" {
|
||||
start_server [list overrides [list loadmodule "$testmodule"]] {
|
||||
start_server {} {
|
||||
r module load $testmodule with_key_events
|
||||
|
||||
test {Test write on post notification callback} {
|
||||
set repl [attach_to_replication_stream]
|
||||
@ -9,11 +10,12 @@ tags "modules" {
|
||||
r set string_x 1
|
||||
assert_equal {1} [r get string_changed{string_x}]
|
||||
assert_equal {1} [r get string_total]
|
||||
|
||||
|
||||
r set string_x 2
|
||||
assert_equal {2} [r get string_changed{string_x}]
|
||||
assert_equal {2} [r get string_total]
|
||||
|
||||
# the {lpush before_overwritten string_x} is a post notification job registered when 'string_x' was overwritten
|
||||
assert_replication_stream $repl {
|
||||
{multi}
|
||||
{select *}
|
||||
@ -23,6 +25,7 @@ tags "modules" {
|
||||
{exec}
|
||||
{multi}
|
||||
{set string_x 2}
|
||||
{lpush before_overwritten string_x}
|
||||
{incr string_changed{string_x}}
|
||||
{incr string_total}
|
||||
{exec}
|
||||
@ -37,7 +40,7 @@ tags "modules" {
|
||||
assert_equal {OK} [r postnotification.async_set]
|
||||
assert_equal {1} [r get string_changed{string_x}]
|
||||
assert_equal {1} [r get string_total]
|
||||
|
||||
|
||||
assert_replication_stream $repl {
|
||||
{multi}
|
||||
{select *}
|
||||
@ -63,12 +66,14 @@ tags "modules" {
|
||||
fail "Failed waiting for x to expired"
|
||||
}
|
||||
|
||||
# the {lpush before_expired x} is a post notification job registered before 'x' got expired
|
||||
assert_replication_stream $repl {
|
||||
{select *}
|
||||
{set x 1}
|
||||
{pexpireat x *}
|
||||
{multi}
|
||||
{del x}
|
||||
{lpush before_expired x}
|
||||
{incr expired}
|
||||
{exec}
|
||||
}
|
||||
@ -85,12 +90,14 @@ tags "modules" {
|
||||
after 10
|
||||
assert_equal {} [r get x]
|
||||
|
||||
# the {lpush before_expired x} is a post notification job registered before 'x' got expired
|
||||
assert_replication_stream $repl {
|
||||
{select *}
|
||||
{set x 1}
|
||||
{pexpireat x *}
|
||||
{multi}
|
||||
{del x}
|
||||
{lpush before_expired x}
|
||||
{incr expired}
|
||||
{exec}
|
||||
}
|
||||
@ -108,6 +115,7 @@ tags "modules" {
|
||||
after 10
|
||||
assert_equal {OK} [r set read_x 1]
|
||||
|
||||
# the {lpush before_expired x} is a post notification job registered before 'x' got expired
|
||||
assert_replication_stream $repl {
|
||||
{select *}
|
||||
{set x 1}
|
||||
@ -115,6 +123,7 @@ tags "modules" {
|
||||
{multi}
|
||||
{set read_x 1}
|
||||
{del x}
|
||||
{lpush before_expired x}
|
||||
{incr expired}
|
||||
{exec}
|
||||
}
|
||||
@ -143,16 +152,18 @@ tags "modules" {
|
||||
r flushall
|
||||
set repl [attach_to_replication_stream]
|
||||
r set x 1
|
||||
r config set maxmemory-policy allkeys-random
|
||||
r config set maxmemory-policy allkeys-random
|
||||
r config set maxmemory 1
|
||||
|
||||
assert_error {OOM *} {r set y 1}
|
||||
|
||||
# the {lpush before_evicted x} is a post notification job registered before 'x' got evicted
|
||||
assert_replication_stream $repl {
|
||||
{select *}
|
||||
{set x 1}
|
||||
{multi}
|
||||
{del x}
|
||||
{lpush before_evicted x}
|
||||
{incr evicted}
|
||||
{exec}
|
||||
}
|
||||
@ -164,7 +175,8 @@ tags "modules" {
|
||||
set testmodule2 [file normalize tests/modules/keyspace_events.so]
|
||||
|
||||
tags "modules" {
|
||||
start_server [list overrides [list loadmodule "$testmodule"]] {
|
||||
start_server {} {
|
||||
r module load $testmodule with_key_events
|
||||
r module load $testmodule2
|
||||
test {Test write on post notification callback} {
|
||||
set repl [attach_to_replication_stream]
|
||||
@ -172,7 +184,7 @@ tags "modules" {
|
||||
r set string_x 1
|
||||
assert_equal {1} [r get string_changed{string_x}]
|
||||
assert_equal {1} [r get string_total]
|
||||
|
||||
|
||||
r set string_x 2
|
||||
assert_equal {2} [r get string_changed{string_x}]
|
||||
assert_equal {2} [r get string_total]
|
||||
@ -181,6 +193,7 @@ tags "modules" {
|
||||
assert_equal {1} [r get string_changed{string1_x}]
|
||||
assert_equal {3} [r get string_total]
|
||||
|
||||
# the {lpush before_overwritten string_x} is a post notification job registered before 'string_x' got overwritten
|
||||
assert_replication_stream $repl {
|
||||
{multi}
|
||||
{select *}
|
||||
@ -190,6 +203,7 @@ tags "modules" {
|
||||
{exec}
|
||||
{multi}
|
||||
{set string_x 2}
|
||||
{lpush before_overwritten string_x}
|
||||
{incr string_changed{string_x}}
|
||||
{incr string_total}
|
||||
{exec}
|
||||
@ -202,4 +216,4 @@ tags "modules" {
|
||||
close_replication_stream $repl
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user